pub(crate) mod buffer;
pub(crate) mod cache;
pub(crate) mod data;
pub(crate) mod evictor;
pub(crate) mod flow;
pub(crate) mod publish;
pub(crate) mod flush;
pub mod table;
use std::sync::Arc;
use dashmap::{DashMap, Entry};
use rustc_hash::FxHasher;
use std::hash::BuildHasherDefault;
use crate::{
OpCode, Options,
cc::context::Context,
map::buffer::Pool,
static_assert,
types::{
refbox::{BoxRef, BoxView},
traits::{IHeader, ILoader},
},
utils::{
Handle, MutRef,
data::{AddrPair, GroupPositions, Interval, Position},
lru::{CachePriority, ShardPriorityLru},
},
};
pub(crate) type PagesMap = DashMap<u64, BoxRef, BuildHasherDefault<FxHasher>>;
pub(crate) type JunksMap = DashMap<u64, RetiredChain, BuildHasherDefault<FxHasher>>;
pub(crate) type Node = crate::types::node::Node<Loader>;
pub(crate) type Page = crate::types::page::Page<Loader>;
#[derive(Clone, Copy)]
struct CacheKey(u128);
impl CacheKey {
const fn new(bucket_id: u64, addr: u64) -> Self {
Self((bucket_id as u128) << 64 | addr as u128)
}
const fn raw(self) -> u128 {
self.0
}
}
#[derive(Default, Clone)]
pub(crate) struct SparseFrontier {
mask: u128,
items: Vec<(u8, Position)>,
}
impl SparseFrontier {
pub(crate) fn is_empty(&self) -> bool {
self.items.is_empty()
}
pub(crate) fn merge_group(&mut self, group: u8, lsn: Position) {
static_assert!(Options::MAX_CONCURRENT_WRITE <= 128);
let bit = 1u128 << group;
if self.mask & bit == 0 {
self.mask |= bit;
self.items.push((group, lsn));
return;
}
for (g, pos) in self.items.iter_mut() {
if *g == group {
*pos = (*pos).max(lsn);
return;
}
}
unreachable!("frontier mask/item mismatch for group {}", group);
}
pub(crate) fn merge_sparse(&mut self, other: &SparseFrontier) {
for &(group, lsn) in &other.items {
self.merge_group(group, lsn);
}
}
pub(crate) fn apply_to(&self, dst: &mut GroupPositions) {
for &(group, lsn) in &self.items {
let idx = group as usize;
debug_assert!(idx < dst.len());
dst[idx] = dst[idx].max(lsn);
}
}
}
#[derive(Default, Clone)]
pub(crate) struct RetiredChain {
pub(crate) addrs: Vec<u64>,
pub(crate) frontier: SparseFrontier,
}
pub struct Loader {
pub(crate) pool: Handle<Pool>,
pub(crate) ctx: Handle<Context>,
pub(crate) lru: Handle<ShardPriorityLru<BoxRef>>,
pub(crate) node_pins: MutRef<DashMap<u64, BoxRef>>,
pub(crate) bucket_id: u64,
pub(crate) reader: Arc<dyn IDataReader>,
}
impl Drop for Loader {
fn drop(&mut self) {}
}
impl Loader {
pub const PIN_CAP: usize = 64;
#[inline]
fn cache_key(&self, addr: u64) -> u128 {
CacheKey::new(self.bucket_id, addr).raw()
}
}
impl ILoader for Loader {
fn copy_detached(&self) -> Self {
Self {
pool: self.pool,
ctx: self.ctx,
lru: self.lru,
node_pins: MutRef::new(DashMap::new()),
bucket_id: self.bucket_id,
reader: self.reader.clone(),
}
}
fn copy(&self) -> Self {
Self {
pool: self.pool,
ctx: self.ctx,
lru: self.lru,
node_pins: self.node_pins.clone(),
bucket_id: self.bucket_id,
reader: self.reader.clone(),
}
}
fn pin(&self, data: BoxRef) {
self.node_pins.insert(data.header().addr, data);
}
fn load_pinned(&self, addr: u64) -> Result<BoxView, OpCode> {
if let Some(p) = self.node_pins.get(&addr) {
return Ok(p.view());
}
let x = if let Some(x) = self.pool.get_dirty_page(addr) {
x
} else {
self.reader.load_data(self.bucket_id, addr, &|_| {})?
};
let e = self.node_pins.entry(addr);
match e {
Entry::Occupied(o) => Ok(o.get().view()),
Entry::Vacant(v) => {
let r = x.view();
v.insert(x);
Ok(r)
}
}
}
fn load_sibling(&self, addr: u64) -> Result<BoxRef, OpCode> {
let key = self.cache_key(addr);
if let Some(x) = self.lru.get(key) {
return Ok(x.clone());
}
if let Some(x) = self.pool.get_dirty_page(addr) {
return Ok(x);
}
self.reader.load_data(self.bucket_id, addr, &|b| {
self.lru
.add(CachePriority::High, key, b.header().total_size as usize, b);
})
}
fn load_blob(&self, addr: u64, cache: bool) -> Result<BoxRef, OpCode> {
let key = self.cache_key(addr);
if cache && let Some(x) = self.lru.get(key) {
return Ok(x.clone());
}
if let Some(x) = self.pool.get_dirty_page(addr) {
return Ok(x);
}
self.reader.load_blob(self.bucket_id, addr, &|b| {
if cache {
self.lru.add(
CachePriority::Low,
key,
b.header().total_size as usize,
b.to_box(),
);
}
})
}
}
pub(crate) enum SharedState {
Quit,
Evict(u64),
}
pub(crate) trait IFooter: Default {
const LEN: usize = size_of::<Self>();
fn nr_interval(&self) -> usize;
fn nr_reloc(&self) -> usize;
fn reloc_crc(&self) -> u32;
fn interval_crc(&self) -> u32;
fn interval_len(&self) -> usize {
self.nr_interval() * Interval::LEN
}
fn reloc_len(&self) -> usize {
self.nr_reloc() * AddrPair::LEN
}
}
pub trait IDataReader: Send + Sync {
fn load_data(
&self,
bucket_id: u64,
addr: u64,
cache: &dyn Fn(BoxRef),
) -> Result<BoxRef, OpCode>;
fn load_blob(
&self,
bucket_id: u64,
addr: u64,
cache: &dyn Fn(BoxView),
) -> Result<BoxRef, OpCode>;
}