Skip to main content

mace/store/
store.rs

1use parking_lot::Mutex;
2
3use crate::cc::context::Context;
4use crate::index::tree::Tree;
5pub use crate::index::txn::{TxnKV, TxnView};
6use crate::map::IDataReader;
7use crate::map::evictor::Evictor;
8use crate::map::flush::{CheckpointObserver, FlushDirective, FlushResult};
9use crate::meta::builder::ManifestBuilder;
10use crate::meta::{
11    BlobStat, BucketMeta, DataStat, IntervalPair, Manifest, MemBlobStat, MemDataStat, MetaKind, Txn,
12};
13use crate::store::gc::{GCHandle, start_gc};
14use crate::store::recovery::Recovery;
15use crate::store::{META_VACUUM_TARGET_BYTES, MetaVacuumStats, VacuumStats};
16use crate::types::refbox::{BoxRef, BoxView};
17use crate::utils::Handle;
18use crate::utils::MutRef;
19pub use crate::utils::OpCode;
20use crate::utils::ROOT_PID;
21use crate::utils::data::init_group_pos;
22pub use crate::utils::options::Options;
23use crate::utils::options::{BucketOptions, ParsedOptions};
24use std::collections::BTreeMap;
25use std::ops::Deref;
26use std::sync::Arc;
27use std::sync::atomic::Ordering::Relaxed;
28use std::sync::mpsc::channel;
29
30struct StoreFlushObserver {
31    manifest: Handle<Manifest>,
32    ctx: Handle<Context>,
33    handle: Mutex<Option<GCHandle>>,
34}
35
36struct StoreDataReader {
37    meta: Handle<Manifest>,
38}
39
40impl StoreDataReader {
41    fn new(meta: Handle<Manifest>) -> Self {
42        Self { meta }
43    }
44
45    fn read_data(
46        &self,
47        bucket_id: u64,
48        addr: u64,
49        cache: &dyn Fn(BoxRef),
50    ) -> Result<BoxRef, OpCode> {
51        self.meta.load_data(bucket_id, addr, cache)
52    }
53
54    fn read_blob(
55        &self,
56        bucket_id: u64,
57        addr: u64,
58        cache: &dyn Fn(BoxView),
59    ) -> Result<BoxRef, OpCode> {
60        self.meta.load_blob(bucket_id, addr, cache)
61    }
62}
63
64impl IDataReader for StoreDataReader {
65    fn load_data(
66        &self,
67        bucket_id: u64,
68        addr: u64,
69        cache: &dyn Fn(BoxRef),
70    ) -> Result<BoxRef, OpCode> {
71        self.read_data(bucket_id, addr, cache)
72    }
73
74    fn load_blob(
75        &self,
76        bucket_id: u64,
77        addr: u64,
78        cache: &dyn Fn(BoxView),
79    ) -> Result<BoxRef, OpCode> {
80        self.read_blob(bucket_id, addr, cache)
81    }
82}
83
84impl StoreFlushObserver {
85    fn new(manifest: Handle<Manifest>, ctx: Handle<Context>) -> Self {
86        Self {
87            manifest,
88            ctx,
89            handle: Mutex::new(None),
90        }
91    }
92
93    #[cfg(feature = "failpoints")]
94    #[cold]
95    fn abort_flush_publish(stage: &str, err: OpCode) -> ! {
96        log::error!("flush publish {} failed: {:?}", stage, err);
97        std::process::abort()
98    }
99
100    fn attach_handle(&self, handle: GCHandle) {
101        self.handle.lock().replace(handle);
102    }
103
104    fn update_stat_interval(
105        &self,
106        txn: &mut Txn,
107        result: &mut FlushResult,
108    ) -> (Vec<DataStat>, Vec<BlobStat>) {
109        let bucket_id = result.bucket_id;
110        let data_tick = result
111            .data_ivls
112            .iter()
113            .map(|x| x.file_id)
114            .max()
115            .unwrap_or_else(|| self.manifest.numerics.next_data_id.load(Relaxed));
116        let mut data_by_file = BTreeMap::<u64, DataStat>::new();
117        for stat in self
118            .manifest
119            .apply_data_junks(bucket_id, data_tick, &result.data_junk)
120        {
121            data_by_file.insert(stat.file_id, stat);
122        }
123        let mut blob_by_file = BTreeMap::<u64, BlobStat>::new();
124        for stat in self.manifest.apply_blob_junks(bucket_id, &result.blob_junk) {
125            blob_by_file.insert(stat.file_id, stat);
126        }
127
128        #[cfg(feature = "extra_check")]
129        assert_eq!(result.data_stats.len(), result.data_ivls.len());
130        #[cfg(feature = "extra_check")]
131        assert_eq!(result.blob_stats.len(), result.blob_ivls.len());
132
133        for (mem_stat, ivl) in result
134            .data_stats
135            .drain(..)
136            .zip(result.data_ivls.iter().copied())
137        {
138            data_by_file
139                .entry(mem_stat.file_id)
140                .or_insert_with(|| mem_stat);
141            self.manifest.clear_orphan_data_file(txn, ivl.file_id);
142        }
143
144        for (mem_stat, ivl) in result
145            .blob_stats
146            .drain(..)
147            .zip(result.blob_ivls.iter().copied())
148        {
149            blob_by_file
150                .entry(mem_stat.file_id)
151                .or_insert_with(|| mem_stat);
152            self.manifest.clear_orphan_blob_file(txn, ivl.file_id);
153        }
154        (
155            data_by_file.into_values().collect(),
156            blob_by_file.into_values().collect(),
157        )
158    }
159
160    fn publish(&self, mut result: FlushResult) {
161        let has_new_files = !result.data_ivls.is_empty() || !result.blob_ivls.is_empty();
162        let bucket_id = result.bucket_id;
163        let frontier_delta = *result.latest_chkpoint_lsn.deref();
164        let previous_frontier = self
165            .manifest
166            .bucket_frontier
167            .get(&bucket_id)
168            .map(|x| *x.value())
169            .unwrap_or_else(init_group_pos);
170        let groups = self.ctx.groups();
171
172        // page checkpoint can fold uncleaned txn versions into durable pages, recovery still needs
173        // the corresponding WAL tail to rebuild tx outcomes before safe_txid can expose them
174        for (i, g) in groups.iter().enumerate() {
175            if i < frontier_delta.len() && frontier_delta[i] > previous_frontier[i] {
176                let mut log = g.logging.lock();
177
178                log.sync_checkpoint_barrier()
179                    .inspect_err(|e| {
180                        log::error!("can't sync WAL checkpoint barrier, {:?}", e);
181                    })
182                    .expect("can't fail");
183            }
184        }
185
186        result.sync(); // must be called before updatin manifest
187        if has_new_files {
188            #[cfg(feature = "failpoints")]
189            crate::utils::failpoint::crash("mace_flush_after_data_dir_sync");
190        }
191        let bucket_frontier = self
192            .manifest
193            .merge_bucket_frontier(bucket_id, &frontier_delta);
194        let mut txn = self.manifest.begin();
195        let (data_stats, blob_stats) = self.update_stat_interval(&mut txn, &mut result);
196
197        for ivl in &result.data_ivls {
198            txn.record(MetaKind::DataInterval, ivl);
199        }
200        for ivl in &result.blob_ivls {
201            txn.record(MetaKind::BlobInterval, ivl);
202        }
203
204        data_stats.iter().for_each(|x| {
205            txn.record(MetaKind::DataStat, x);
206        });
207        blob_stats.iter().for_each(|x| {
208            txn.record(MetaKind::BlobStat, x);
209        });
210
211        txn.record(MetaKind::BucketFrontier, &bucket_frontier);
212        txn.record(MetaKind::Map, &result.map_table);
213        txn.record(MetaKind::Numerics, self.manifest.numerics.deref());
214
215        #[cfg(feature = "failpoints")]
216        if let Err(e) = crate::utils::failpoint::check("mace_flush_before_manifest_commit") {
217            Self::abort_flush_publish("before manifest commit", e);
218        }
219        txn.commit();
220
221        self.manifest.clear_synced_data();
222        self.manifest.clear_synced_blob();
223
224        #[cfg(feature = "failpoints")]
225        if let Err(e) = crate::utils::failpoint::check("mace_flush_after_manifest_commit") {
226            Self::abort_flush_publish("after manifest commit", e);
227        }
228
229        let groups = self.ctx.groups();
230        let sync = self.ctx.opt.sync_on_write;
231        let global_frontier = self.manifest.global_frontier_lower_bound(groups.len());
232
233        for (i, g) in groups.iter().enumerate() {
234            let mut pos = global_frontier[i];
235            if let Some(min) = g.active_txns.min_lsn()
236                && min < pos
237            {
238                pos = min;
239            }
240            let mut lk = g.logging.lock();
241            if lk.update_checkpoint(pos) && sync {
242                let mut f = lk.writer.clone();
243                drop(lk);
244                // checkpoint must be synced in durable mode
245                f.sync();
246            }
247        }
248    }
249}
250
251impl CheckpointObserver for StoreFlushObserver {
252    fn flush_directive(&self, bucket_id: u64) -> FlushDirective {
253        match self.manifest.bucket_states.get(&bucket_id) {
254            Some(state) => {
255                if state.is_deleting() {
256                    return FlushDirective::Skip;
257                }
258                FlushDirective::Normal
259            }
260            None => FlushDirective::Skip,
261        }
262    }
263
264    fn stage_unsynced_data_file(&self, file_id: u64) {
265        self.manifest.stage_unsynced_data_file(file_id);
266    }
267
268    fn stage_unsynced_blob_file(&self, file_id: u64) {
269        self.manifest.stage_unsynced_blob_file(file_id);
270    }
271
272    fn stage_orphan_data_file(&self, file_id: u64) {
273        self.manifest.stage_orphan_data_file(file_id);
274    }
275
276    fn stage_orphan_blob_file(&self, file_id: u64) {
277        self.manifest.stage_orphan_blob_file(file_id);
278    }
279
280    fn update_data_mem_interval_stat(&self, ivl: IntervalPair, stat: MemDataStat) {
281        self.manifest.add_data_stat(stat, ivl);
282    }
283
284    fn update_blob_mem_interval_stat(&self, ivl: IntervalPair, stat: MemBlobStat) {
285        self.manifest.add_blob_stat(stat, ivl);
286    }
287
288    fn on_checkpoint(&self, result: FlushResult) {
289        self.publish(result)
290    }
291
292    fn finish_checkpoint(&self) {
293        let h = self.handle.lock();
294        if let Some(h) = h.as_ref() {
295            h.wal_clean(self.ctx);
296        }
297    }
298}
299
300pub struct Store {
301    pub(crate) manifest: Handle<Manifest>,
302    pub(crate) context: Handle<Context>,
303    pub(crate) opt: Arc<ParsedOptions>,
304}
305
306impl Store {
307    pub fn new(opt: Arc<ParsedOptions>, manifest: Handle<Manifest>, ctx: Handle<Context>) -> Self {
308        Self {
309            manifest,
310            context: ctx,
311            opt,
312        }
313    }
314
315    pub(crate) fn start(&self) {
316        self.context.start();
317    }
318
319    pub(crate) fn quit(&self) {
320        // 1) stop new writes and flush outstanding WAL
321        let _ = self.context.sync();
322
323        // 2) stop background workers in order: evictor -> flusher -> buckets
324        // bucket.quit will send Quit to evictor thread and wait ack
325        self.manifest.buckets.quit();
326
327        // 3) after evictor/flush threads stopped, shut down WAL threads
328        self.context.quit();
329
330        // 4) reclaim contexts (arena/page caches) first, then manifest
331        self.context.reclaim();
332        self.manifest.reclaim();
333    }
334}
335
336/// The internal storage engine instance.
337pub struct Inner {
338    pub(crate) store: MutRef<Store>,
339    pub(crate) gc: GCHandle,
340}
341
342impl Inner {
343    const MAX_BUCKET_NAME_LEN: usize = 32;
344
345    fn new_bucket(this: &Arc<Inner>, name: &str, opt: BucketOptions) -> Result<Bucket, OpCode> {
346        if name.len() >= Self::MAX_BUCKET_NAME_LEN {
347            return Err(OpCode::TooLarge);
348        }
349        let (meta, bucket_ctx) = this.store.manifest.create_bucket(name, opt)?;
350
351        Ok(Bucket {
352            tree: Tree::new(this.store.clone(), ROOT_PID, bucket_ctx),
353            _holder: meta,
354            inner: this.clone(),
355        })
356    }
357
358    fn get_bucket(this: &Arc<Inner>, name: &str) -> Result<Bucket, OpCode> {
359        if name.len() >= Self::MAX_BUCKET_NAME_LEN {
360            return Err(OpCode::TooLarge);
361        }
362        let meta = this.store.manifest.load_bucket_meta(name)?;
363        let bucket_ctx = this.store.manifest.load_bucket_context(meta.id)?;
364
365        Ok(Bucket {
366            tree: Tree::new(this.store.clone(), ROOT_PID, bucket_ctx),
367            _holder: meta,
368            inner: this.clone(),
369        })
370    }
371
372    fn update_bucket_opt(this: &Arc<Inner>, name: &str, opt: BucketOptions) -> Result<(), OpCode> {
373        if name.len() >= Self::MAX_BUCKET_NAME_LEN {
374            return Err(OpCode::TooLarge);
375        }
376        this.store.manifest.update_bucket_options(name, opt)
377    }
378
379    /// manually unload bucket to release memory
380    fn drop_bucket(self: &Inner, name: &str) -> Result<(), OpCode> {
381        self.store.context.sync()?;
382        self.store.manifest.unload_bucket(name)
383    }
384
385    fn del_bucket(self: &Inner, name: &str) -> Result<(), OpCode> {
386        self.store.manifest.delete_bucket(name)
387    }
388
389    fn vacuum_bucket(self: &Inner, name: &str) -> Result<VacuumStats, OpCode> {
390        if name.len() >= Self::MAX_BUCKET_NAME_LEN {
391            return Err(OpCode::TooLarge);
392        }
393        let meta = self.store.manifest.load_bucket_meta(name)?;
394        let bucket_ctx = self.store.manifest.load_bucket_context(meta.id)?;
395        crate::store::gc::vacuum_bucket(self.store.clone(), bucket_ctx)
396    }
397
398    fn is_bucket_vacuuming(self: &Inner, name: &str) -> Result<bool, OpCode> {
399        if name.len() >= Self::MAX_BUCKET_NAME_LEN {
400            return Err(OpCode::TooLarge);
401        }
402        let meta = self.store.manifest.load_bucket_meta(name)?;
403        let bucket_ctx = self.store.manifest.load_bucket_context(meta.id)?;
404        Ok(bucket_ctx.state.is_vacuuming())
405    }
406
407    fn vacuum_meta(self: &Inner) -> Result<MetaVacuumStats, OpCode> {
408        self.store.manifest.vacuum_meta(META_VACUUM_TARGET_BYTES)
409    }
410
411    fn checkpoint(&self, bucket_id: u64) {
412        if let Ok(ctx) = self.store.manifest.load_bucket_context(bucket_id) {
413            ctx.checkpoint();
414        }
415    }
416}
417
418impl Drop for Inner {
419    fn drop(&mut self) {
420        self.gc.quit();
421        self.store.quit();
422    }
423}
424
425/// A bucket is a named collection of key-value pairs.
426#[derive(Clone)]
427pub struct Bucket {
428    pub(crate) tree: Tree,
429    pub(crate) _holder: Arc<BucketMeta>,
430    pub(crate) inner: Arc<Inner>,
431}
432
433impl Bucket {
434    /// Begins a new read-write transaction.
435    pub fn begin(&'_ self) -> Result<TxnKV<'_>, OpCode> {
436        TxnKV::new(&self.inner.store.context, &self.tree)
437    }
438
439    /// Begins a new read-only transaction (view).
440    pub fn view(&'_ self) -> Result<TxnView<'_>, OpCode> {
441        TxnView::new(&self.inner.store.context, &self.tree)
442    }
443
444    /// Starts a manual checkpoint which will flush dirty pages to disk and may trigger WAL gc
445    pub fn checkpoint(&self) {
446        self.inner.checkpoint(self.id());
447    }
448
449    /// Returns the unique identifier of this bucket.
450    pub fn id(&self) -> u64 {
451        self.tree.bucket_id()
452    }
453
454    /// Returns the options used by this bucket.
455    pub fn options(&self) -> &Options {
456        &self.inner.store.opt
457    }
458}
459
460impl Deref for Bucket {
461    type Target = Inner;
462
463    fn deref(&self) -> &Self::Target {
464        &self.inner
465    }
466}
467
468/// The main entry point for the Mace storage engine.
469#[derive(Clone)]
470pub struct Mace {
471    pub(crate) inner: Arc<Inner>,
472}
473
474impl Mace {
475    /// Creates a new Mace instance with the given options.
476    pub fn new(opt: ParsedOptions) -> Result<Self, OpCode> {
477        let opt = Arc::new(opt);
478        let (tx, erx) = channel();
479        let (etx, rx) = channel();
480
481        let mut builder = ManifestBuilder::new_with_channels(opt.clone(), tx, rx);
482        builder.load()?;
483        let manifest = Handle::new(builder.finish());
484
485        let mut recover = Recovery::new(opt.clone());
486        let (wal_boot, ctx) = recover.phase1(manifest.numerics.clone())?;
487        let observer = Arc::new(StoreFlushObserver::new(manifest, ctx));
488        let reader = Arc::new(StoreDataReader::new(manifest));
489        manifest.set_context(ctx, reader, observer.clone());
490
491        let store = MutRef::new(Store::new(opt.clone(), manifest, ctx));
492
493        recover.phase2(&wal_boot, store.clone())?;
494        store.start();
495        let handle = start_gc(store.clone(), store.context);
496        observer.attach_handle(handle.clone());
497        let evictor = Evictor::new(
498            opt.clone(),
499            manifest.buckets,
500            manifest.numerics.clone(),
501            erx,
502            etx,
503        );
504        evictor.start();
505
506        Ok(Self {
507            inner: Arc::new(Inner { store, gc: handle }),
508        })
509    }
510
511    /// Returns the options used by this Mace instance.
512    pub fn options(&self) -> &Options {
513        &self.inner.store.opt
514    }
515
516    /// Creates a bucket with the given name.
517    /// NOTE: name must be less than 32 bytes.
518    pub fn new_bucket<S: AsRef<str>>(&self, name: S, opt: BucketOptions) -> Result<Bucket, OpCode> {
519        Inner::new_bucket(&self.inner, name.as_ref(), opt.validate())
520    }
521
522    /// Gets an existing bucket with the given name.
523    /// NOTE: name must be less than 32 bytes.
524    pub fn get_bucket<S: AsRef<str>>(&self, name: S) -> Result<Bucket, OpCode> {
525        Inner::get_bucket(&self.inner, name.as_ref())
526    }
527
528    /// Updates the persisted bucket-scoped options of an existing bucket
529    ///
530    /// Returns [`OpCode::Again`] if the bucket is currently loaded
531    ///
532    /// Returns [`OpCode::Invalid`] if the requested [`BucketOptions`] conflict with
533    /// persisted compatibility-sensitive bucket options
534    pub fn update_bucket_opt<S: AsRef<str>>(
535        &self,
536        name: S,
537        opt: BucketOptions,
538    ) -> Result<(), OpCode> {
539        Inner::update_bucket_opt(&self.inner, name.as_ref(), opt.validate())
540    }
541
542    /// Returns a list of all active bucket names.
543    pub fn active_buckets(&self) -> Vec<String> {
544        self.inner.store.manifest.loaded_bucket_names()
545    }
546
547    /// Manually unloads a bucket to release memory.
548    pub fn drop_bucket<S: AsRef<str>>(&self, name: S) -> Result<(), OpCode> {
549        Inner::drop_bucket(&self.inner, name.as_ref())
550    }
551
552    /// Deletes a bucket and all its data.
553    pub fn del_bucket<S: AsRef<str>>(&self, name: S) -> Result<(), OpCode> {
554        Inner::del_bucket(&self.inner, name.as_ref())
555    }
556
557    /// Vacuums a bucket by scavenging and compacting its pages
558    pub fn vacuum_bucket<S: AsRef<str>>(&self, name: S) -> Result<VacuumStats, OpCode> {
559        Inner::vacuum_bucket(&self.inner, name.as_ref())
560    }
561
562    /// Returns whether bucket vacuum is currently running
563    pub fn is_bucket_vacuuming<S: AsRef<str>>(&self, name: S) -> Result<bool, OpCode> {
564        Inner::is_bucket_vacuuming(&self.inner, name.as_ref())
565    }
566
567    /// Vacuums metadata by compacting the manifest btree
568    pub fn vacuum_meta(&self) -> Result<MetaVacuumStats, OpCode> {
569        Inner::vacuum_meta(&self.inner)
570    }
571
572    /// Disables garbage collection.
573    pub fn disable_gc(&self) {
574        self.inner.gc.pause();
575    }
576
577    /// Enables garbage collection.
578    pub fn enable_gc(&self) {
579        self.inner.gc.resume();
580    }
581
582    /// Starts a garbage collection cycle immediately.
583    pub fn start_gc(&self) {
584        self.inner.gc.start();
585    }
586
587    /// Returns the number of data garbage collection cycles performed.
588    pub fn data_gc_count(&self) -> u64 {
589        self.inner.gc.data_gc_count()
590    }
591
592    /// Returns the number of blob garbage collection cycles performed.
593    pub fn blob_gc_count(&self) -> u64 {
594        self.inner.gc.blob_gc_count()
595    }
596
597    /// Returns the total number of buckets, including active and pending deletion ones.
598    pub fn nr_buckets(&self) -> u64 {
599        self.inner
600            .store
601            .manifest
602            .nr_buckets
603            .load(std::sync::atomic::Ordering::Relaxed)
604    }
605
606    /// Synchronizes all WAL to disk.
607    pub fn sync(&self) -> Result<(), OpCode> {
608        self.inner.store.context.sync()
609    }
610}