#![deny(missing_docs)]
use atom_file::{Data, Storage};
use heap::GHeap;
pub use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
use std::sync::{Arc, Mutex, RwLock};
mod block;
mod blockpagestg;
mod dividedstg;
mod heap;
mod util;
pub use blockpagestg::BlockPageStg;
#[cfg(feature = "pstd")]
use pstd::collections::BTreeMap;
#[cfg(not(feature = "pstd"))]
use std::collections::BTreeMap;
#[derive(PartialEq, Eq, PartialOrd, Clone, Copy)]
pub enum SaveOp {
Save,
RollBack,
}
pub trait PageStorage: Send + Sync {
fn is_new(&self) -> bool;
fn info(&self) -> Box<dyn PageStorageInfo>;
fn new_page(&mut self) -> u64;
fn drop_page(&mut self, pn: u64);
fn set_page(&mut self, pn: u64, data: Data);
fn get_page(&self, pn: u64) -> Data;
fn size(&self, pn: u64) -> usize;
fn save(&mut self);
fn rollback(&mut self);
fn wait_complete(&self);
#[cfg(feature = "verify")]
fn get_free(&mut self) -> (crate::HashSet<u64>, u64);
#[cfg(feature = "renumber")]
fn renumber(&mut self, pn: u64) -> u64;
#[cfg(feature = "renumber")]
fn load_free_pages(&mut self) -> Option<u64>;
#[cfg(feature = "renumber")]
fn set_alloc_pn(&mut self, target: u64);
}
pub trait PageStorageInfo: Send + Sync {
fn sizes(&self) -> usize;
fn index(&self, size: usize) -> usize;
fn size(&self, ix: usize) -> usize;
fn max_size_page(&self) -> usize {
self.size(self.sizes())
}
fn half_size_page(&self) -> usize {
self.size(self.index(self.max_size_page() / 2 - 50))
}
fn compress(&self, size: usize, saving: usize) -> bool {
self.index(size - saving) < self.index(size)
}
}
type HX = u32; type Heap = GHeap<u64, u64, HX>;
type PageInfoPtr = Arc<Mutex<PageInfo>>;
pub struct PageInfo {
pub current: Option<Data>,
pub history: BTreeMap<u64, Data>,
pub usage: u64,
pub hx: HX,
}
impl PageInfo {
fn new() -> PageInfoPtr {
Arc::new(Mutex::new(PageInfo {
current: None,
history: BTreeMap::new(),
usage: 0,
hx: HX::MAX,
}))
}
fn inc_usage(&mut self, lpnum: u64, ah: &mut Heap) {
self.usage += 1;
if self.hx == HX::MAX {
self.hx = ah.insert(lpnum, self.usage);
} else {
ah.modify(self.hx, self.usage);
}
}
fn get_data(&mut self, lpnum: u64, a: &AccessPagedData) -> (Data, usize) {
if !a.writer
&& let Some((_k, v)) = self.history.range(a.time..).next()
{
return (v.clone(), 0);
}
if let Some(p) = &self.current {
return (p.clone(), 0);
}
let ps = a.spd.ps.read().unwrap();
let data = ps.get_page(lpnum);
self.current = Some(data.clone());
let len = data.len();
(data, len)
}
fn set_data(&mut self, time: u64, old: Data, data: Data, do_history: bool) -> (usize, usize) {
if do_history {
self.history.insert(time, old);
}
let old = if let Some(x) = &self.current {
x.len()
} else {
0
};
let new = data.len();
self.current = if new == 0 { None } else { Some(data) };
(old, new)
}
fn trim(&mut self, t: u64, start: u64) -> bool {
let first = self.history_start(t);
if first >= start {
self.history.remove(&t);
false
} else {
true
}
}
fn history_start(&self, t: u64) -> u64 {
if let Some((k, _)) = self.history.range(..t).next_back() {
*k + 1
} else {
0
}
}
}
#[derive(Default)]
pub struct Stash {
time: u64,
pub pages: HashMap<u64, PageInfoPtr>,
rdrs: BTreeMap<u64, usize>,
vers: BTreeMap<u64, HashSet<u64>>,
pub total: i64, pub mem_limit: usize,
min: Heap,
pub read: u64,
pub miss: u64,
}
impl Stash {
fn set(&mut self, lpnum: u64, old: Data, data: Data) {
let time = self.time;
let u = self.vers.entry(time).or_default();
let do_history = u.insert(lpnum);
let p = self.get_pinfo(lpnum);
let diff = p.lock().unwrap().set_data(time, old, data, do_history);
self.delta(diff, false, false);
}
fn get_pinfo(&mut self, lpnum: u64) -> PageInfoPtr {
let p = self
.pages
.entry(lpnum)
.or_insert_with(PageInfo::new)
.clone();
p.lock().unwrap().inc_usage(lpnum, &mut self.min);
self.read += 1;
p
}
fn begin_read(&mut self) -> u64 {
let time = self.time;
let n = self.rdrs.entry(time).or_insert(0);
*n += 1;
time
}
fn end_read(&mut self, time: u64) {
let n = self.rdrs.get_mut(&time).unwrap();
*n -= 1;
if *n == 0 {
self.rdrs.remove(&time);
self.trim(time);
}
}
fn end_write(&mut self) -> usize {
let result = if let Some(u) = self.vers.get(&self.time) {
u.len()
} else {
0
};
let t = self.time;
self.time = t + 1;
self.trim(t);
result
}
fn trim(&mut self, time: u64) {
let (s, r) = (self.start(time), self.retain(time));
if s != r {
let mut empty = Vec::<u64>::new();
for (t, pl) in self.vers.range_mut(s..r) {
pl.retain(|pnum| {
let p = self.pages.get(pnum).unwrap();
p.lock().unwrap().trim(*t, s)
});
if pl.is_empty() {
empty.push(*t);
}
}
for t in empty {
self.vers.remove(&t);
}
}
}
fn start(&self, time: u64) -> u64 {
if let Some((t, _n)) = self.rdrs.range(..time).next_back() {
1 + *t
} else {
0
}
}
fn retain(&self, time: u64) -> u64 {
if let Some((t, _n)) = self.rdrs.range(time..).next() {
*t
} else {
self.time
}
}
fn delta(&mut self, d: (usize, usize), miss: bool, trim: bool) {
if miss {
self.miss += 1;
}
self.total += d.1 as i64 - d.0 as i64;
if trim {
self.trim_cache();
}
}
pub fn trim_cache(&mut self) {
while self.total > self.mem_limit as i64 && self.min.len() > 0 {
let lpnum = self.min.pop();
let mut p = self.pages.get(&lpnum).unwrap().lock().unwrap();
p.hx = HX::MAX;
if let Some(data) = &p.current {
self.total -= data.len() as i64;
p.current = None;
}
}
}
pub fn cached(&self) -> usize {
self.min.len() as usize
}
}
pub struct SharedPagedData {
pub ps: RwLock<Box<dyn PageStorage>>,
pub stash: Mutex<Stash>,
pub psi: Box<dyn PageStorageInfo>,
}
impl SharedPagedData {
pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
let limits = crate::Limits::default();
Self::new_from_ps(crate::blockpagestg::BlockPageStg::new(stg, &limits))
}
pub fn new_from_ps(ps: Box<dyn PageStorage>) -> Arc<Self> {
let stash = Stash {
mem_limit: 10 * 1024 * 1024,
..Default::default()
};
let psi = ps.info();
Arc::new(Self {
stash: Mutex::new(stash),
ps: RwLock::new(ps),
psi,
})
}
pub fn new_reader(self: &Arc<Self>) -> AccessPagedData {
let time = self.stash.lock().unwrap().begin_read();
AccessPagedData {
writer: false,
time,
spd: self.clone(),
}
}
pub fn new_writer(self: &Arc<Self>) -> AccessPagedData {
AccessPagedData {
writer: true,
time: 0,
spd: self.clone(),
}
}
pub fn wait_complete(&self) {
self.ps.read().unwrap().wait_complete();
}
}
pub struct AccessPagedData {
writer: bool,
time: u64,
pub spd: Arc<SharedPagedData>,
}
impl AccessPagedData {
#[deprecated(note = "use SharedPagedData::new_reader instead")]
pub fn new_reader(spd: Arc<SharedPagedData>) -> Self {
let time = spd.stash.lock().unwrap().begin_read();
AccessPagedData {
writer: false,
time,
spd,
}
}
#[deprecated(note = "use SharedPagedData::new_writer instead")]
pub fn new_writer(spd: Arc<SharedPagedData>) -> Self {
AccessPagedData {
writer: true,
time: 0,
spd,
}
}
pub fn stash(&self) -> std::sync::MutexGuard<'_, Stash> {
self.spd.stash.lock().unwrap()
}
pub fn get_data(&self, lpnum: u64) -> Data {
let pinfo = self.stash().get_pinfo(lpnum);
let (data, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
if loaded > 0 {
self.stash().delta((0, loaded), true, true);
}
data
}
pub fn set_data(&self, lpnum: u64, data: Data) {
debug_assert!(self.writer);
let pinfo = self.stash().get_pinfo(lpnum);
let (old, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
{
let s = &mut *self.stash();
if loaded > 0 {
s.delta((0, loaded), true, false);
}
s.set(lpnum, old, data.clone());
s.trim_cache();
}
if !data.is_empty() {
self.spd.ps.write().unwrap().set_page(lpnum, data);
} else {
self.spd.ps.write().unwrap().drop_page(lpnum);
}
}
pub fn alloc_page(&self) -> u64 {
debug_assert!(self.writer);
self.spd.ps.write().unwrap().new_page()
}
pub fn free_page(&self, lpnum: u64) {
self.set_data(lpnum, Data::default());
}
pub fn is_new(&self) -> bool {
self.writer && self.spd.ps.read().unwrap().is_new()
}
pub fn compress(&self, size: usize, saving: usize) -> bool {
debug_assert!(self.writer);
self.spd.psi.compress(size, saving)
}
pub fn save(&self, op: SaveOp) -> usize {
debug_assert!(self.writer);
match op {
SaveOp::Save => {
self.spd.ps.write().unwrap().save();
self.stash().end_write()
}
SaveOp::RollBack => {
self.spd.ps.write().unwrap().rollback();
0
}
}
}
#[cfg(feature = "renumber")]
pub fn renumber_page(&self, lpnum: u64) -> u64 {
assert!(self.writer);
let data = self.get_data(lpnum);
self.stash().set(lpnum, data.clone(), Data::default());
let lpnum2 = self.spd.ps.write().unwrap().renumber(lpnum);
debug_assert!(
self.stash()
.get_pinfo(lpnum2)
.lock()
.unwrap()
.current
.is_none()
);
let old2 = self.get_data(lpnum2);
self.stash().set(lpnum2, old2, data);
lpnum2
}
}
impl Drop for AccessPagedData {
fn drop(&mut self) {
if !self.writer {
self.stash().end_read(self.time);
}
}
}
#[non_exhaustive]
pub struct Limits {
pub af_lim: atom_file::Limits,
pub blk_cap: u64,
pub page_sizes: usize,
pub max_div: usize,
}
impl Default for Limits {
fn default() -> Self {
Self {
af_lim: atom_file::Limits::default(),
blk_cap: 27720,
page_sizes: 7,
max_div: 12,
}
}
}