use crate::result::Result;
use crate::{TxInSafe, TxOutSafe};
use std::collections::hash_map::HashMap;
use std::fmt::{self, Debug};
use std::fs::OpenOptions;
use std::io::{self, Error, Write};
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::path::Path;
use std::sync::Mutex;
use std::thread::ThreadId;
use std::{mem, panic, ptr, slice, str, thread};
const MAX_TRANS: usize = 4096;
pub struct Chaperon {
len: usize,
completed: bool,
done: [bool; MAX_TRANS],
filename: [u8; 4096],
filename_len: usize,
vdata: Option<VData>,
}
struct VData {
mmap: memmap::MmapMut,
delayed_commit: HashMap<ThreadId, Vec<&'static dyn Fn() -> ()>>,
delayed_rollback: HashMap<ThreadId, Vec<&'static dyn Fn() -> ()>>,
delayed_clear: HashMap<ThreadId, Vec<&'static dyn Fn() -> ()>>,
mutex: u8,
}
impl VData {
pub fn new(mmap: memmap::MmapMut) -> Self {
Self {
mmap,
delayed_commit: HashMap::new(),
delayed_rollback: HashMap::new(),
delayed_clear: HashMap::new(),
mutex: 0,
}
}
}
impl !TxOutSafe for Chaperon {}
impl UnwindSafe for Chaperon {}
impl RefUnwindSafe for Chaperon {}
unsafe impl TxInSafe for Chaperon {}
unsafe impl Send for Chaperon {}
unsafe impl Sync for Chaperon {}
struct SyncBox<T: ?Sized> {
data: *mut T
}
impl<T: ?Sized> SyncBox<T> {
fn new(data: *mut T) -> Self {
Self { data }
}
fn get(&self) -> *mut T {
self.data
}
}
unsafe impl<T:?Sized> Sync for SyncBox<T> {}
unsafe impl<T:?Sized> Send for SyncBox<T> {}
lazy_static!{
static ref CLIST: Mutex<HashMap<ThreadId, SyncBox<Chaperon>>> =
Mutex::new(HashMap::new());
}
fn new_chaperon(filename: &str) -> Result<*mut Chaperon> {
let mut clist = match CLIST.lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
let tid = thread::current().id();
if clist.contains_key(&tid) {
return Err("Another chaperoned transaction is open".to_string());
}
let c = Chaperon::new(filename.to_string())
.expect(&format!("could not create chaperon file `{}`", filename));
clist.entry(tid).or_insert(SyncBox::new(c));
Ok(clist.get(&tid).unwrap().get())
}
fn drop_chaperon() {
let mut clist = match CLIST.lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
let tid = thread::current().id();
clist.remove(&tid);
}
fn current_chaperon() -> Option<*mut Chaperon> {
let clist = match CLIST.lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
let tid = thread::current().id();
if clist.contains_key(&tid) {
Some(clist.get(&tid).unwrap().get())
} else {
None
}
}
impl Chaperon {
pub(crate) fn new(filename: String) -> io::Result<&'static mut Self> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&filename)?;
file.set_len(1024 * 1024 * 1)?;
let mut a = Self {
len: 0,
completed: false,
done: [true; MAX_TRANS],
filename: [0; 4096],
filename_len: filename.len(),
vdata: None,
};
let bytes = filename.as_bytes();
for i in 0..usize::min(4096, filename.len()) {
a.filename[i] = bytes[i];
}
file.write_all(a.as_bytes()).unwrap();
unsafe { Self::load(filename) }
}
fn deref(raw: *mut u8) -> &'static mut Self {
union U<'b, K: 'b + ?Sized> {
raw: *mut u8,
ref_mut: &'b mut K,
}
unsafe { U { raw }.ref_mut }
}
fn as_bytes(&self) -> &[u8] {
let ptr = self as *const Self;
let ptr = ptr as *const u8;
unsafe { std::slice::from_raw_parts(ptr, std::mem::size_of::<Self>()) }
}
pub unsafe fn load(filename: String) -> io::Result<&'static mut Self> {
if Path::new(&filename).exists() {
let file = OpenOptions::new().read(true).write(true).open(&filename)?;
let mut mmap = memmap::MmapOptions::new().map_mut(&file).unwrap();
let slf = Self::deref(mmap.get_mut(0).unwrap());
mem::forget(ptr::replace(&mut slf.vdata, Some(VData::new(mmap))));
Ok(slf)
} else {
Err(Error::last_os_error())
}
}
pub(crate) fn current() -> Option<*mut Chaperon> {
current_chaperon()
}
pub(crate) fn new_section(&mut self) -> usize {
use crate::ll::msync_obj;
assert!(self.len < MAX_TRANS, "reached max number of attachments");
self.len += 1;
self.done[self.len - 1] = false;
msync_obj(self);
self.len
}
#[inline]
pub(crate) fn is_done(&self, id: usize) -> bool {
let id = id - 1;
assert!(id < self.len, "index out of range");
self.done[id]
}
#[inline]
pub(crate) fn finish(&mut self, id: usize) {
let id = id - 1;
assert!(id < self.len, "index out of range");
self.done[id] = true;
if self.completed() {
self.close();
}
}
pub(crate) fn completed(&mut self) -> bool {
if self.completed {
true
} else {
for i in 0..self.len {
if !self.done[i] {
return false;
}
}
self.completed = true;
true
}
}
fn close(&self) {
}
pub fn filename(&self) -> &str {
unsafe {
let slice = slice::from_raw_parts(&self.filename[0], self.filename_len);
str::from_utf8(slice).unwrap()
}
}
pub(crate) fn postpone<F: Fn() -> (), R: Fn() -> (), E: Fn() -> ()>(
&mut self,
commit: &'static F,
rollback: &'static R,
clear: &'static E,
) {
if let Some(vdata) = self.vdata.as_mut() {
let tid = thread::current().id();
let commits = vdata.delayed_commit.entry(tid).or_insert(Vec::new());
let rollbacks = vdata.delayed_rollback.entry(tid).or_insert(Vec::new());
let clears = vdata.delayed_clear.entry(tid).or_insert(Vec::new());
commits.push(commit);
rollbacks.push(rollback);
clears.push(clear);
}
}
fn execute_delayed_commits(&mut self) {
if let Some(vdata) = self.vdata.as_mut() {
let tid = thread::current().id();
let commits = vdata.delayed_commit.entry(tid).or_insert(Vec::new());
let clears = vdata.delayed_clear.entry(tid).or_insert(Vec::new());
for commit in commits {
commit();
}
self.completed = true;
for clear in clears {
clear();
}
vdata.delayed_commit.remove(&tid);
vdata.delayed_clear.remove(&tid);
}
}
fn execute_delayed_rollbacks(&mut self) {
if let Some(vdata) = self.vdata.as_mut() {
let tid = thread::current().id();
let rollbacks = vdata.delayed_rollback.entry(tid).or_insert(Vec::new());
let clears = vdata.delayed_clear.entry(tid).or_insert(Vec::new());
for rollback in rollbacks {
rollback();
}
self.completed = true;
for clear in clears {
clear();
}
vdata.delayed_rollback.remove(&tid);
vdata.delayed_clear.remove(&tid);
}
}
#[inline]
pub fn session<T, F: FnOnce() -> T>(filename: &str, body: F) -> Result<T>
where
F: panic::UnwindSafe,
T: panic::UnwindSafe + TxOutSafe,
{
let chaperon = unsafe { &mut *new_chaperon(filename)? };
let res = panic::catch_unwind(|| body());
if let Ok(res) = res {
chaperon.execute_delayed_commits();
drop_chaperon();
Ok(res)
} else {
chaperon.execute_delayed_rollbacks();
drop_chaperon();
Err("Unsuccessful chaperoned transaction".to_string())
}
}
}
impl Drop for Chaperon {
fn drop(&mut self) {
if let Some(vdata) = self.vdata.as_ref() {
vdata.mmap.flush().unwrap();
}
}
}
impl Debug for Chaperon {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{{ filename: {}, len: {}, [", self.filename(), self.len)?;
for i in 0..self.len {
write!(f, "{}{}", if i == 0 { "" } else { ", " }, self.done[i])?;
}
write!(f, "] }}")
}
}