use std::{
ops::Deref,
sync::{
atomic::{
AtomicUsize,
Ordering::{Acquire, Relaxed, Release},
},
Arc, Mutex, RwLock,
},
};
use pagecache::{FastMap8, PagePtr};
use super::*;
#[derive(Clone)]
pub struct Db {
config: Config,
pages: Arc<PageCache<BLinkMaterializer, Frag, Recovery>>,
idgen: Arc<AtomicUsize>,
idgen_persists: Arc<AtomicUsize>,
idgen_persist_mu: Arc<Mutex<()>>,
tenants: Arc<RwLock<FastMap8<Vec<u8>, Arc<Tree>>>>,
default: Arc<Tree>,
transactions: Arc<Tree>,
was_recovered: bool,
}
unsafe impl Send for Db {}
unsafe impl Sync for Db {}
impl Drop for Db {
fn drop(&mut self) {
if let Err(e) = self.pages.flush() {
error!(
"failed to flush underlying \
pagecache during drop: {:?}",
e
);
}
#[cfg(feature = "event_log")]
{
let guard = pin();
self.config.event_log.meta_before_restart(
meta::meta(&*self.pages, &guard)
.expect("should get meta under test")
.clone(),
);
}
}
}
impl Deref for Db {
type Target = Tree;
fn deref(&self) -> &Tree {
&self.default
}
}
impl Db {
pub fn start_default<P: AsRef<std::path::Path>>(
path: P,
) -> Result<Db, ()> {
let config = ConfigBuilder::new().path(path).build();
Self::start(config)
}
pub fn start(config: Config) -> Result<Db, ()> {
let _measure = Measure::new(&M.tree_start);
#[cfg(any(test, feature = "check_snapshot_integrity"))]
match config
.verify_snapshot::<BLinkMaterializer, Frag, Recovery>()
{
Ok(_) => {}
#[cfg(feature = "failpoints")]
Err(Error::FailPoint) => {}
other => panic!("failed to verify snapshot: {:?}", other),
}
let pages = Arc::new(PageCache::start(config.clone())?);
let recovery: Recovery =
pages.recovered_state().unwrap_or_default();
let idgen_recovery =
recovery.counter + (2 * config.idgen_persist_interval);
let idgen_persists = recovery.counter
/ config.idgen_persist_interval
* config.idgen_persist_interval;
let guard = pin();
let was_recovered: bool;
if pages
.get(META_PID, &guard)
.map_err(|e| e.danger_cast())?
.is_unallocated()
{
was_recovered = false;
let meta_id = pages.allocate(&guard)?;
assert_eq!(
meta_id,
META_PID,
"we expect the meta page to have pid {}, but it had pid {} instead",
META_PID,
meta_id,
);
let meta = Frag::Meta(Meta::default());
pages
.replace(meta_id, PagePtr::allocated(), meta, &guard)
.map_err(|e| e.danger_cast())?;
let counter_id = pages.allocate(&guard)?;
assert_eq!(
counter_id,
COUNTER_PID,
"we expect the counter to have pid {}, but it had pid {} instead",
COUNTER_PID,
counter_id,
);
let counter = Frag::Counter(0);
pages
.replace(
counter_id,
PagePtr::allocated(),
counter,
&guard,
)
.map_err(|e| e.danger_cast())?;
} else {
was_recovered = true;
};
let default_tree = meta::open_tree(
pages.clone(),
config.clone(),
DEFAULT_TREE_ID.to_vec(),
&guard,
)?;
let tx_tree = meta::open_tree(
pages.clone(),
config.clone(),
TX_TREE_ID.to_vec(),
&guard,
)?;
let ret = Db {
pages: pages,
config,
idgen: Arc::new(AtomicUsize::new(idgen_recovery)),
idgen_persists: Arc::new(AtomicUsize::new(
idgen_persists,
)),
idgen_persist_mu: Arc::new(Mutex::new(())),
tenants: Arc::new(RwLock::new(FastMap8::default())),
default: Arc::new(default_tree),
transactions: Arc::new(tx_tree),
was_recovered,
};
#[cfg(feature = "event_log")]
ret.config.event_log.meta_after_restart(
meta::meta(&*ret.pages, &guard)
.expect("should be able to get meta under test")
.clone(),
);
let mut tenants = ret.tenants.write().unwrap();
for (id, _root) in
meta::meta(&*ret.pages, &guard)?.tenants().into_iter()
{
let tree = Tree {
tree_id: id.clone(),
subscriptions: Arc::new(Subscriptions::default()),
config: ret.config.clone(),
pages: ret.pages.clone(),
};
tenants.insert(id, Arc::new(tree));
}
drop(tenants);
let mut ret = ret;
ret.default = ret.open_tree(DEFAULT_TREE_ID.to_vec())?;
ret.transactions = ret.open_tree(TX_TREE_ID.to_vec())?;
Ok(ret)
}
pub fn generate_id(&self) -> Result<usize, ()> {
let ret = self.idgen.fetch_add(1, Relaxed);
let interval = self.config.idgen_persist_interval;
let necessary_persists = ret / interval * interval;
let mut persisted = self.idgen_persists.load(Acquire);
while persisted < necessary_persists {
let _mu = self.idgen_persist_mu.lock().unwrap();
persisted = self.idgen_persists.load(Acquire);
if persisted < necessary_persists {
let guard = pin();
let (current, key) = self
.pages
.get(COUNTER_PID, &guard)
.map_err(|e| e.danger_cast())?
.unwrap();
if let Frag::Counter(current) = current {
assert_eq!(*current, persisted);
} else {
panic!(
"counter pid contained non-Counter: {:?}",
current
);
}
let counter_frag = Frag::Counter(necessary_persists);
let old = self
.idgen_persists
.swap(necessary_persists, Release);
assert_eq!(old, persisted);
self.pages
.replace(
COUNTER_PID,
key.clone(),
counter_frag,
&guard,
)
.map_err(|e| e.danger_cast())?;
if key.last_lsn() > self.pages.stable_lsn() {
self.pages.make_stable(key.last_lsn())?;
}
guard.flush();
}
}
Ok(ret)
}
pub fn open_tree(&self, name: Vec<u8>) -> Result<Arc<Tree>, ()> {
let guard = pin();
let tenants = self.tenants.read().unwrap();
if let Some(tree) = tenants.get(&name) {
return Ok(tree.clone());
}
drop(tenants);
let leaf_id = self.pages.allocate(&guard)?;
trace!(
"allocated pid {} for leaf in open_tree for namespace {:?}",
leaf_id,
name,
);
let leaf = Frag::Base(Node {
id: leaf_id,
data: Data::Leaf(vec![]),
next: None,
lo: vec![].into(),
hi: vec![].into(),
});
self.pages
.replace(leaf_id, PagePtr::allocated(), leaf, &guard)
.map_err(|e| e.danger_cast())?;
let root_id = self.pages.allocate(&guard)?;
debug!(
"allocated pid {} for root of new_tree {:?}",
root_id, name
);
let root_index_vec = vec![(vec![0].into(), leaf_id)];
let root = Frag::Base(Node {
id: root_id,
data: Data::Index(root_index_vec),
next: None,
lo: vec![].into(),
hi: vec![].into(),
});
self.pages
.replace(root_id, PagePtr::allocated(), root, &guard)
.map_err(|e| e.danger_cast())?;
meta::cas_root(
&*self.pages,
name.clone(),
None,
Some(root_id),
&guard,
)
.map_err(|e| e.danger_cast())?;
let mut tenants = self.tenants.write().unwrap();
let tree = Arc::new(Tree {
tree_id: name.clone(),
subscriptions: Arc::new(Subscriptions::default()),
config: self.config.clone(),
pages: self.pages.clone(),
});
tenants.insert(name, tree.clone());
Ok(tree)
}
pub fn drop_tree(&self, name: &[u8]) -> Result<bool, ()> {
if name == DEFAULT_TREE_ID || name == TX_TREE_ID {
return Err(Error::Unsupported(
"cannot remove the core structures".into(),
));
}
trace!("dropping tree {:?}", name,);
let mut tenants = self.tenants.write().unwrap();
let tree = if let Some(tree) = tenants.remove(&*name) {
tree
} else {
return Ok(false);
};
let guard = pin();
let mut root_id =
meta::pid_for_name(&*self.pages, &name, &guard)?;
let leftmost_chain: Vec<PageId> = tree
.path_for_key(b"", &guard)?
.into_iter()
.map(|(frag, _tp)| frag.unwrap_base().id)
.collect();
loop {
let res = meta::cas_root(
&*self.pages,
name.to_vec(),
Some(root_id),
None,
&guard,
)
.map_err(|e| e.danger_cast());
match res {
Ok(_) => break,
Err(Error::CasFailed(actual)) => root_id = actual,
Err(other) => return Err(other.danger_cast()),
}
}
drop(tenants);
guard.defer(move || tree.gc_pages(leftmost_chain));
guard.flush();
Ok(true)
}
pub fn was_recovered(&self) -> bool {
self.was_recovered
}
pub fn mark_pending_tx(
&self,
_pages: Vec<PageId>,
_txid: u64,
) -> Result<(), ()> {
unimplemented!()
}
pub fn abort_tx(&self, _txid: u64) -> Result<(), ()> {
unimplemented!()
}
pub fn commit_tx(&self, _txid: u64) -> Result<(), ()> {
unimplemented!()
}
pub fn cleanup_tx(&self, _txid: u64) -> Result<(), ()> {
unimplemented!()
}
pub fn possibly_dirty_pages(
&self,
) -> Result<Vec<(u64, Vec<PageId>)>, ()> {
unimplemented!()
}
}