use crate::cc::context::Context;
use crate::index::tree::Tree;
pub use crate::index::txn::{TxnKV, TxnView};
use crate::map::buffer::DataReader;
use crate::map::evictor::Evictor;
use crate::map::flush::{FlushDirective, FlushObserver, FlushResult};
use crate::meta::builder::ManifestBuilder;
use crate::meta::{BucketMeta, Manifest, MetaKind, PendingRangeKind};
use crate::store::gc::{GCHandle, start_gc};
use crate::store::recovery::Recovery;
use crate::store::{META_VACUUM_TARGET_BYTES, MetaVacuumStats, VacuumStats};
use crate::types::refbox::BoxRef;
use crate::utils::Handle;
use crate::utils::MutRef;
pub use crate::utils::OpCode;
use crate::utils::ROOT_PID;
use crate::utils::observe::CounterMetric;
pub use crate::utils::options::Options;
use crate::utils::options::ParsedOptions;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::mpsc::channel;
struct StoreFlushObserver {
manifest: Handle<Manifest>,
ctx: Handle<Context>,
}
struct StoreDataReader {
meta: Handle<Manifest>,
}
impl StoreDataReader {
fn new(meta: Handle<Manifest>) -> Self {
Self { meta }
}
fn read_data(
&self,
bucket_id: u64,
addr: u64,
cache: &dyn Fn(BoxRef),
) -> Result<BoxRef, OpCode> {
self.meta.load_data(bucket_id, addr, cache)
}
fn read_blob(
&self,
bucket_id: u64,
addr: u64,
cache: &dyn Fn(BoxRef),
) -> Result<BoxRef, OpCode> {
self.meta.load_blob(bucket_id, addr, cache)
}
fn read_blob_uncached(&self, bucket_id: u64, addr: u64) -> Result<BoxRef, OpCode> {
self.meta.load_blob_uncached(bucket_id, addr)
}
}
impl DataReader for StoreDataReader {
fn load_data(
&self,
bucket_id: u64,
addr: u64,
cache: &dyn Fn(BoxRef),
) -> Result<BoxRef, OpCode> {
self.read_data(bucket_id, addr, cache)
}
fn load_blob(
&self,
bucket_id: u64,
addr: u64,
cache: &dyn Fn(BoxRef),
) -> Result<BoxRef, OpCode> {
self.read_blob(bucket_id, addr, cache)
}
fn load_blob_uncached(&self, bucket_id: u64, addr: u64) -> Result<BoxRef, OpCode> {
self.read_blob_uncached(bucket_id, addr)
}
}
impl StoreFlushObserver {
fn new(manifest: Handle<Manifest>, ctx: Handle<Context>) -> Self {
Self { manifest, ctx }
}
fn remove_sorted_intersection(lhs: &mut Vec<u64>, rhs: &mut Vec<u64>) {
if lhs.is_empty() || rhs.is_empty() {
return;
}
lhs.sort_unstable();
lhs.dedup();
rhs.sort_unstable();
rhs.dedup();
let mut i = 0;
let mut j = 0;
let mut wl = 0;
let mut wr = 0;
while i < lhs.len() && j < rhs.len() {
let x = lhs[i];
let y = rhs[j];
if x < y {
lhs[wl] = x;
wl += 1;
i += 1;
} else if x > y {
rhs[wr] = y;
wr += 1;
j += 1;
} else {
i += 1;
j += 1;
}
}
while i < lhs.len() {
lhs[wl] = lhs[i];
wl += 1;
i += 1;
}
while j < rhs.len() {
rhs[wr] = rhs[j];
wr += 1;
j += 1;
}
lhs.truncate(wl);
rhs.truncate(wr);
}
fn publish_one(&self, mut result: FlushResult) -> Result<(), OpCode> {
let groups = self.ctx.groups();
debug_assert_eq!(result.flsn.len(), groups.len());
let ctx = self
.manifest
.buckets
.buckets
.get(&result.bucket_id)
.expect("bucket context must exist when flushing");
if let Some(data_ivl) = &result.data_ivl {
ctx.data_intervals
.write()
.insert(data_ivl.lo_addr, data_ivl.hi_addr, data_ivl.file_id);
}
if let Some(blob_ivl) = result.blob_ivl {
ctx.blob_intervals
.write()
.insert(blob_ivl.lo_addr, blob_ivl.hi_addr, blob_ivl.file_id);
}
let mut stage_pending_addrs = std::mem::take(&mut result.pending_sibling_addrs);
let mut clear_pending_addrs = std::mem::take(&mut result.published_sibling_addrs);
Self::remove_sorted_intersection(&mut stage_pending_addrs, &mut clear_pending_addrs);
result.done.release();
for (i, g) in groups.iter().enumerate() {
let target = result.flsn[i];
let mut lk = g.logging.lock();
let ok = lk.should_durable(target);
if ok {
let mut f = lk.writer.clone();
drop(lk);
f.sync();
}
}
let mut txn = self.manifest.begin();
for &addr in &result.data_junks {
self.manifest.record_pending_retire(
&mut txn,
&crate::meta::PendingRetire::new(result.bucket_id, PendingRangeKind::Data, addr),
);
}
for &addr in &result.blob_junks {
self.manifest.record_pending_retire(
&mut txn,
&crate::meta::PendingRetire::new(result.bucket_id, PendingRangeKind::Blob, addr),
);
}
let retire_recorded = (result.data_junks.len() + result.blob_junks.len()) as u64;
if retire_recorded > 0 {
self.ctx
.opt
.observer
.counter(CounterMetric::RetireRecorded, retire_recorded);
}
if !stage_pending_addrs.is_empty() {
let data_id = result
.data_id
.expect("pending sibling publish requires data file id");
for &addr in &stage_pending_addrs {
let pending = crate::meta::PendingSibling::new(
result.bucket_id,
data_id,
PendingRangeKind::Data,
addr,
);
self.manifest.record_pending_sibling(&mut txn, &pending);
}
}
if let Some(data_stat) = &result.data_stat {
txn.record(MetaKind::DataStat, data_stat);
}
txn.record(MetaKind::Map, &result.map_table);
if let Some(data_ivl) = &result.data_ivl {
txn.record(MetaKind::DataInterval, data_ivl);
}
if let (Some(blob_ivl), Some(blob_stat)) = (&result.blob_ivl, &result.blob_stat) {
txn.record(MetaKind::BlobStat, blob_stat);
txn.record(MetaKind::BlobInterval, blob_ivl);
}
for &addr in &clear_pending_addrs {
self.manifest.clear_pending_sibling(
&mut txn,
result.bucket_id,
PendingRangeKind::Data,
addr,
);
}
txn.record(MetaKind::Numerics, self.manifest.numerics.deref());
if let Some(data_id) = result.data_id {
self.manifest.clear_orphan_data_file(&mut txn, data_id);
}
if let Some(blob_ivl) = result.blob_ivl {
self.manifest
.clear_orphan_blob_file(&mut txn, blob_ivl.file_id);
}
#[cfg(feature = "failpoints")]
crate::utils::failpoint::check("mace_flush_before_manifest_commit")?;
txn.commit();
#[cfg(feature = "failpoints")]
crate::utils::failpoint::check("mace_flush_after_manifest_commit")?;
if let Some(mem_data_stat) = result.mem_data_stat {
self.manifest.add_data_stat(mem_data_stat);
}
if let Some(mem_blob_stat) = result.mem_blob_stat {
self.manifest.add_blob_stat(mem_blob_stat);
}
result.done.mark_done();
for (i, g) in groups.iter().enumerate() {
let mut pos = result.flsn[i];
if let Some(min) = g.active_txns.min_lsn()
&& min < pos
{
pos = min;
}
let mut lk = g.logging.lock();
if lk.update_checkpoint(pos) {
let mut f = lk.writer.clone();
drop(lk);
f.sync();
}
}
Ok(())
}
}
impl FlushObserver for StoreFlushObserver {
fn flush_directive(&self, bucket_id: u64) -> FlushDirective {
match self.manifest.bucket_states.get(&bucket_id) {
Some(state) => {
if state.is_deleting() {
return FlushDirective::Skip;
}
FlushDirective::Normal
}
None => FlushDirective::Skip,
}
}
fn stage_orphan_data_file(&self, file_id: u64) {
self.manifest.stage_orphan_data_file(file_id);
}
fn stage_orphan_blob_file(&self, file_id: u64) {
self.manifest.stage_orphan_blob_file(file_id);
}
fn on_flush(&self, result: FlushResult) -> Result<(), OpCode> {
self.publish_one(result)
}
}
pub struct Store {
pub(crate) manifest: Handle<Manifest>,
pub(crate) context: Handle<Context>,
pub(crate) opt: Arc<ParsedOptions>,
}
impl Store {
pub fn new(opt: Arc<ParsedOptions>, manifest: Handle<Manifest>, ctx: Handle<Context>) -> Self {
Self {
manifest,
context: ctx,
opt,
}
}
pub(crate) fn start(&self) {
self.context.start();
}
pub(crate) fn quit(&self) {
let _ = self.context.sync();
self.manifest.buckets.quit();
self.context.quit();
self.context.reclaim();
self.manifest.reclaim();
}
}
pub struct Inner {
pub(crate) store: MutRef<Store>,
pub(crate) gc: GCHandle,
}
impl Inner {
const MAX_BUCKET_NAME_LEN: usize = 32;
fn new_bucket(this: &Arc<Inner>, name: &str) -> Result<Bucket, OpCode> {
if name.len() >= Self::MAX_BUCKET_NAME_LEN {
return Err(OpCode::TooLarge);
}
let (meta, bucket_ctx) = this.store.manifest.create_bucket(name)?;
Ok(Bucket {
tree: Tree::new(this.store.clone(), ROOT_PID, bucket_ctx),
_holder: meta,
inner: this.clone(),
})
}
fn get_bucket(this: &Arc<Inner>, name: &str) -> Result<Bucket, OpCode> {
if name.len() >= Self::MAX_BUCKET_NAME_LEN {
return Err(OpCode::TooLarge);
}
let meta = this.store.manifest.load_bucket_meta(name)?;
let bucket_ctx = this.store.manifest.load_bucket_context(meta.bucket_id)?;
Ok(Bucket {
tree: Tree::new(this.store.clone(), ROOT_PID, bucket_ctx),
_holder: meta,
inner: this.clone(),
})
}
fn drop_bucket(self: &Inner, name: &str) -> Result<(), OpCode> {
self.store.context.sync()?;
self.store.manifest.unload_bucket(name)
}
fn del_bucket(self: &Inner, name: &str) -> Result<(), OpCode> {
self.store.manifest.delete_bucket(name)
}
fn vacuum_bucket(self: &Inner, name: &str) -> Result<VacuumStats, OpCode> {
if name.len() >= Self::MAX_BUCKET_NAME_LEN {
return Err(OpCode::TooLarge);
}
let meta = self.store.manifest.load_bucket_meta(name)?;
let bucket_ctx = self.store.manifest.load_bucket_context(meta.bucket_id)?;
crate::store::gc::vacuum_bucket(self.store.clone(), bucket_ctx)
}
fn is_bucket_vacuuming(self: &Inner, name: &str) -> Result<bool, OpCode> {
if name.len() >= Self::MAX_BUCKET_NAME_LEN {
return Err(OpCode::TooLarge);
}
let meta = self.store.manifest.load_bucket_meta(name)?;
let bucket_ctx = self.store.manifest.load_bucket_context(meta.bucket_id)?;
Ok(bucket_ctx.state.is_vacuuming())
}
fn vacuum_meta(self: &Inner) -> Result<MetaVacuumStats, OpCode> {
self.store.manifest.vacuum_meta(META_VACUUM_TARGET_BYTES)
}
}
impl Drop for Inner {
fn drop(&mut self) {
self.gc.quit();
self.store.quit();
}
}
#[derive(Clone)]
pub struct Bucket {
pub(crate) tree: Tree,
pub(crate) _holder: Arc<BucketMeta>,
pub(crate) inner: Arc<Inner>,
}
impl Bucket {
pub fn begin(&'_ self) -> Result<TxnKV<'_>, OpCode> {
TxnKV::new(&self.inner.store.context, &self.tree)
}
pub fn view(&'_ self) -> Result<TxnView<'_>, OpCode> {
TxnView::new(&self.inner.store.context, &self.tree)
}
pub fn id(&self) -> u64 {
self.tree.bucket_id()
}
pub fn options(&self) -> &Options {
&self.inner.store.opt
}
}
impl Deref for Bucket {
type Target = Inner;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Clone)]
pub struct Mace {
pub(crate) inner: Arc<Inner>,
}
impl Mace {
pub fn new(opt: ParsedOptions) -> Result<Self, OpCode> {
let opt = Arc::new(opt);
let (tx, erx) = channel();
let (etx, rx) = channel();
let mut builder = ManifestBuilder::new_with_channels(opt.clone(), tx, rx);
builder.load()?;
let manifest = Handle::new(builder.finish());
let mut recover = Recovery::new(opt.clone());
let (wal_boot, ctx) = recover.phase1(manifest.numerics.clone())?;
let observer = Arc::new(StoreFlushObserver::new(manifest, ctx));
let reader = Arc::new(StoreDataReader::new(manifest));
manifest.set_context(ctx, reader, observer);
manifest.recover_pending_retires_to_stats();
let store = MutRef::new(Store::new(opt.clone(), manifest, ctx));
recover.phase2(&wal_boot, store.clone())?;
store.start();
let handle = start_gc(store.clone(), store.context);
let evictor = Evictor::new(
opt.clone(),
manifest.buckets,
manifest.numerics.clone(),
manifest.buckets.used.clone(),
erx,
etx,
);
evictor.start();
Ok(Self {
inner: Arc::new(Inner { store, gc: handle }),
})
}
pub fn options(&self) -> &Options {
&self.inner.store.opt
}
pub fn new_bucket<S: AsRef<str>>(&self, name: S) -> Result<Bucket, OpCode> {
Inner::new_bucket(&self.inner, name.as_ref())
}
pub fn get_bucket<S: AsRef<str>>(&self, name: S) -> Result<Bucket, OpCode> {
Inner::get_bucket(&self.inner, name.as_ref())
}
pub fn active_buckets(&self) -> Vec<String> {
self.inner.store.manifest.loaded_bucket_names()
}
pub fn drop_bucket<S: AsRef<str>>(&self, name: S) -> Result<(), OpCode> {
Inner::drop_bucket(&self.inner, name.as_ref())
}
pub fn del_bucket<S: AsRef<str>>(&self, name: S) -> Result<(), OpCode> {
Inner::del_bucket(&self.inner, name.as_ref())
}
pub fn vacuum_bucket<S: AsRef<str>>(&self, name: S) -> Result<VacuumStats, OpCode> {
Inner::vacuum_bucket(&self.inner, name.as_ref())
}
pub fn is_bucket_vacuuming<S: AsRef<str>>(&self, name: S) -> Result<bool, OpCode> {
Inner::is_bucket_vacuuming(&self.inner, name.as_ref())
}
pub fn vacuum_meta(&self) -> Result<MetaVacuumStats, OpCode> {
Inner::vacuum_meta(&self.inner)
}
pub fn disable_gc(&self) {
self.inner.gc.pause();
}
pub fn enable_gc(&self) {
self.inner.gc.resume();
}
pub fn start_gc(&self) {
self.inner.gc.start();
}
pub fn data_gc_count(&self) -> u64 {
self.inner.gc.data_gc_count()
}
pub fn blob_gc_count(&self) -> u64 {
self.inner.gc.blob_gc_count()
}
pub fn nr_buckets(&self) -> u64 {
self.inner
.store
.manifest
.nr_buckets
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn sync(&self) -> Result<(), OpCode> {
self.inner.store.context.sync()
}
}