1use parking_lot::Mutex;
2
3use crate::cc::context::Context;
4use crate::index::tree::Tree;
5pub use crate::index::txn::{TxnKV, TxnView};
6use crate::map::IDataReader;
7use crate::map::evictor::Evictor;
8use crate::map::flush::{CheckpointObserver, FlushDirective, FlushResult};
9use crate::meta::builder::ManifestBuilder;
10use crate::meta::{
11 BlobStat, BucketMeta, DataStat, IntervalPair, Manifest, MemBlobStat, MemDataStat, MetaKind, Txn,
12};
13use crate::store::gc::{GCHandle, start_gc};
14use crate::store::recovery::Recovery;
15use crate::store::{META_VACUUM_TARGET_BYTES, MetaVacuumStats, VacuumStats};
16use crate::types::refbox::{BoxRef, BoxView};
17use crate::utils::Handle;
18use crate::utils::MutRef;
19pub use crate::utils::OpCode;
20use crate::utils::ROOT_PID;
21use crate::utils::data::init_group_pos;
22pub use crate::utils::options::Options;
23use crate::utils::options::{BucketOptions, ParsedOptions};
24use std::collections::BTreeMap;
25use std::ops::Deref;
26use std::sync::Arc;
27use std::sync::atomic::Ordering::Relaxed;
28use std::sync::mpsc::channel;
29
30struct StoreFlushObserver {
31 manifest: Handle<Manifest>,
32 ctx: Handle<Context>,
33 handle: Mutex<Option<GCHandle>>,
34}
35
36struct StoreDataReader {
37 meta: Handle<Manifest>,
38}
39
40impl StoreDataReader {
41 fn new(meta: Handle<Manifest>) -> Self {
42 Self { meta }
43 }
44
45 fn read_data(
46 &self,
47 bucket_id: u64,
48 addr: u64,
49 cache: &dyn Fn(BoxRef),
50 ) -> Result<BoxRef, OpCode> {
51 self.meta.load_data(bucket_id, addr, cache)
52 }
53
54 fn read_blob(
55 &self,
56 bucket_id: u64,
57 addr: u64,
58 cache: &dyn Fn(BoxView),
59 ) -> Result<BoxRef, OpCode> {
60 self.meta.load_blob(bucket_id, addr, cache)
61 }
62}
63
64impl IDataReader for StoreDataReader {
65 fn load_data(
66 &self,
67 bucket_id: u64,
68 addr: u64,
69 cache: &dyn Fn(BoxRef),
70 ) -> Result<BoxRef, OpCode> {
71 self.read_data(bucket_id, addr, cache)
72 }
73
74 fn load_blob(
75 &self,
76 bucket_id: u64,
77 addr: u64,
78 cache: &dyn Fn(BoxView),
79 ) -> Result<BoxRef, OpCode> {
80 self.read_blob(bucket_id, addr, cache)
81 }
82}
83
84impl StoreFlushObserver {
85 fn new(manifest: Handle<Manifest>, ctx: Handle<Context>) -> Self {
86 Self {
87 manifest,
88 ctx,
89 handle: Mutex::new(None),
90 }
91 }
92
93 #[cfg(feature = "failpoints")]
94 #[cold]
95 fn abort_flush_publish(stage: &str, err: OpCode) -> ! {
96 log::error!("flush publish {} failed: {:?}", stage, err);
97 std::process::abort()
98 }
99
100 fn attach_handle(&self, handle: GCHandle) {
101 self.handle.lock().replace(handle);
102 }
103
104 fn update_stat_interval(
105 &self,
106 txn: &mut Txn,
107 result: &mut FlushResult,
108 ) -> (Vec<DataStat>, Vec<BlobStat>) {
109 let bucket_id = result.bucket_id;
110 let data_tick = result
111 .data_ivls
112 .iter()
113 .map(|x| x.file_id)
114 .max()
115 .unwrap_or_else(|| self.manifest.numerics.next_data_id.load(Relaxed));
116 let mut data_by_file = BTreeMap::<u64, DataStat>::new();
117 for stat in self
118 .manifest
119 .apply_data_junks(bucket_id, data_tick, &result.data_junk)
120 {
121 data_by_file.insert(stat.file_id, stat);
122 }
123 let mut blob_by_file = BTreeMap::<u64, BlobStat>::new();
124 for stat in self.manifest.apply_blob_junks(bucket_id, &result.blob_junk) {
125 blob_by_file.insert(stat.file_id, stat);
126 }
127
128 #[cfg(feature = "extra_check")]
129 assert_eq!(result.data_stats.len(), result.data_ivls.len());
130 #[cfg(feature = "extra_check")]
131 assert_eq!(result.blob_stats.len(), result.blob_ivls.len());
132
133 for (mem_stat, ivl) in result
134 .data_stats
135 .drain(..)
136 .zip(result.data_ivls.iter().copied())
137 {
138 data_by_file
139 .entry(mem_stat.file_id)
140 .or_insert_with(|| mem_stat);
141 self.manifest.clear_orphan_data_file(txn, ivl.file_id);
142 }
143
144 for (mem_stat, ivl) in result
145 .blob_stats
146 .drain(..)
147 .zip(result.blob_ivls.iter().copied())
148 {
149 blob_by_file
150 .entry(mem_stat.file_id)
151 .or_insert_with(|| mem_stat);
152 self.manifest.clear_orphan_blob_file(txn, ivl.file_id);
153 }
154 (
155 data_by_file.into_values().collect(),
156 blob_by_file.into_values().collect(),
157 )
158 }
159
160 fn publish(&self, mut result: FlushResult) {
161 let has_new_files = !result.data_ivls.is_empty() || !result.blob_ivls.is_empty();
162 let bucket_id = result.bucket_id;
163 let frontier_delta = *result.latest_chkpoint_lsn.deref();
164 let previous_frontier = self
165 .manifest
166 .bucket_frontier
167 .get(&bucket_id)
168 .map(|x| *x.value())
169 .unwrap_or_else(init_group_pos);
170 let groups = self.ctx.groups();
171
172 for (i, g) in groups.iter().enumerate() {
175 if i < frontier_delta.len() && frontier_delta[i] > previous_frontier[i] {
176 let mut log = g.logging.lock();
177
178 log.sync_checkpoint_barrier()
179 .inspect_err(|e| {
180 log::error!("can't sync WAL checkpoint barrier, {:?}", e);
181 })
182 .expect("can't fail");
183 }
184 }
185
186 result.sync(); if has_new_files {
188 #[cfg(feature = "failpoints")]
189 crate::utils::failpoint::crash("mace_flush_after_data_dir_sync");
190 }
191 let bucket_frontier = self
192 .manifest
193 .merge_bucket_frontier(bucket_id, &frontier_delta);
194 let mut txn = self.manifest.begin();
195 let (data_stats, blob_stats) = self.update_stat_interval(&mut txn, &mut result);
196
197 for ivl in &result.data_ivls {
198 txn.record(MetaKind::DataInterval, ivl);
199 }
200 for ivl in &result.blob_ivls {
201 txn.record(MetaKind::BlobInterval, ivl);
202 }
203
204 data_stats.iter().for_each(|x| {
205 txn.record(MetaKind::DataStat, x);
206 });
207 blob_stats.iter().for_each(|x| {
208 txn.record(MetaKind::BlobStat, x);
209 });
210
211 txn.record(MetaKind::BucketFrontier, &bucket_frontier);
212 txn.record(MetaKind::Map, &result.map_table);
213 txn.record(MetaKind::Numerics, self.manifest.numerics.deref());
214
215 #[cfg(feature = "failpoints")]
216 if let Err(e) = crate::utils::failpoint::check("mace_flush_before_manifest_commit") {
217 Self::abort_flush_publish("before manifest commit", e);
218 }
219 txn.commit();
220
221 self.manifest.clear_synced_data();
222 self.manifest.clear_synced_blob();
223
224 #[cfg(feature = "failpoints")]
225 if let Err(e) = crate::utils::failpoint::check("mace_flush_after_manifest_commit") {
226 Self::abort_flush_publish("after manifest commit", e);
227 }
228
229 let groups = self.ctx.groups();
230 let sync = self.ctx.opt.sync_on_write;
231 let global_frontier = self.manifest.global_frontier_lower_bound(groups.len());
232
233 for (i, g) in groups.iter().enumerate() {
234 let mut pos = global_frontier[i];
235 if let Some(min) = g.active_txns.min_lsn()
236 && min < pos
237 {
238 pos = min;
239 }
240 let mut lk = g.logging.lock();
241 if lk.update_checkpoint(pos) && sync {
242 let mut f = lk.writer.clone();
243 drop(lk);
244 f.sync();
246 }
247 }
248 }
249}
250
251impl CheckpointObserver for StoreFlushObserver {
252 fn flush_directive(&self, bucket_id: u64) -> FlushDirective {
253 match self.manifest.bucket_states.get(&bucket_id) {
254 Some(state) => {
255 if state.is_deleting() {
256 return FlushDirective::Skip;
257 }
258 FlushDirective::Normal
259 }
260 None => FlushDirective::Skip,
261 }
262 }
263
264 fn stage_unsynced_data_file(&self, file_id: u64) {
265 self.manifest.stage_unsynced_data_file(file_id);
266 }
267
268 fn stage_unsynced_blob_file(&self, file_id: u64) {
269 self.manifest.stage_unsynced_blob_file(file_id);
270 }
271
272 fn stage_orphan_data_file(&self, file_id: u64) {
273 self.manifest.stage_orphan_data_file(file_id);
274 }
275
276 fn stage_orphan_blob_file(&self, file_id: u64) {
277 self.manifest.stage_orphan_blob_file(file_id);
278 }
279
280 fn update_data_mem_interval_stat(&self, ivl: IntervalPair, stat: MemDataStat) {
281 self.manifest.add_data_stat(stat, ivl);
282 }
283
284 fn update_blob_mem_interval_stat(&self, ivl: IntervalPair, stat: MemBlobStat) {
285 self.manifest.add_blob_stat(stat, ivl);
286 }
287
288 fn on_checkpoint(&self, result: FlushResult) {
289 self.publish(result)
290 }
291
292 fn finish_checkpoint(&self) {
293 let h = self.handle.lock();
294 if let Some(h) = h.as_ref() {
295 h.wal_clean(self.ctx);
296 }
297 }
298}
299
300pub struct Store {
301 pub(crate) manifest: Handle<Manifest>,
302 pub(crate) context: Handle<Context>,
303 pub(crate) opt: Arc<ParsedOptions>,
304}
305
306impl Store {
307 pub fn new(opt: Arc<ParsedOptions>, manifest: Handle<Manifest>, ctx: Handle<Context>) -> Self {
308 Self {
309 manifest,
310 context: ctx,
311 opt,
312 }
313 }
314
315 pub(crate) fn start(&self) {
316 self.context.start();
317 }
318
319 pub(crate) fn quit(&self) {
320 let _ = self.context.sync();
322
323 self.manifest.buckets.quit();
326
327 self.context.quit();
329
330 self.context.reclaim();
332 self.manifest.reclaim();
333 }
334}
335
336pub struct Inner {
338 pub(crate) store: MutRef<Store>,
339 pub(crate) gc: GCHandle,
340}
341
342impl Inner {
343 const MAX_BUCKET_NAME_LEN: usize = 32;
344
345 fn new_bucket(this: &Arc<Inner>, name: &str, opt: BucketOptions) -> Result<Bucket, OpCode> {
346 if name.len() >= Self::MAX_BUCKET_NAME_LEN {
347 return Err(OpCode::TooLarge);
348 }
349 let (meta, bucket_ctx) = this.store.manifest.create_bucket(name, opt)?;
350
351 Ok(Bucket {
352 tree: Tree::new(this.store.clone(), ROOT_PID, bucket_ctx),
353 _holder: meta,
354 inner: this.clone(),
355 })
356 }
357
358 fn get_bucket(this: &Arc<Inner>, name: &str) -> Result<Bucket, OpCode> {
359 if name.len() >= Self::MAX_BUCKET_NAME_LEN {
360 return Err(OpCode::TooLarge);
361 }
362 let meta = this.store.manifest.load_bucket_meta(name)?;
363 let bucket_ctx = this.store.manifest.load_bucket_context(meta.id)?;
364
365 Ok(Bucket {
366 tree: Tree::new(this.store.clone(), ROOT_PID, bucket_ctx),
367 _holder: meta,
368 inner: this.clone(),
369 })
370 }
371
372 fn update_bucket_opt(this: &Arc<Inner>, name: &str, opt: BucketOptions) -> Result<(), OpCode> {
373 if name.len() >= Self::MAX_BUCKET_NAME_LEN {
374 return Err(OpCode::TooLarge);
375 }
376 this.store.manifest.update_bucket_options(name, opt)
377 }
378
379 fn drop_bucket(self: &Inner, name: &str) -> Result<(), OpCode> {
381 self.store.context.sync()?;
382 self.store.manifest.unload_bucket(name)
383 }
384
385 fn del_bucket(self: &Inner, name: &str) -> Result<(), OpCode> {
386 self.store.manifest.delete_bucket(name)
387 }
388
389 fn vacuum_bucket(self: &Inner, name: &str) -> Result<VacuumStats, OpCode> {
390 if name.len() >= Self::MAX_BUCKET_NAME_LEN {
391 return Err(OpCode::TooLarge);
392 }
393 let meta = self.store.manifest.load_bucket_meta(name)?;
394 let bucket_ctx = self.store.manifest.load_bucket_context(meta.id)?;
395 crate::store::gc::vacuum_bucket(self.store.clone(), bucket_ctx)
396 }
397
398 fn is_bucket_vacuuming(self: &Inner, name: &str) -> Result<bool, OpCode> {
399 if name.len() >= Self::MAX_BUCKET_NAME_LEN {
400 return Err(OpCode::TooLarge);
401 }
402 let meta = self.store.manifest.load_bucket_meta(name)?;
403 let bucket_ctx = self.store.manifest.load_bucket_context(meta.id)?;
404 Ok(bucket_ctx.state.is_vacuuming())
405 }
406
407 fn vacuum_meta(self: &Inner) -> Result<MetaVacuumStats, OpCode> {
408 self.store.manifest.vacuum_meta(META_VACUUM_TARGET_BYTES)
409 }
410
411 fn checkpoint(&self, bucket_id: u64) {
412 if let Ok(ctx) = self.store.manifest.load_bucket_context(bucket_id) {
413 ctx.checkpoint();
414 }
415 }
416}
417
418impl Drop for Inner {
419 fn drop(&mut self) {
420 self.gc.quit();
421 self.store.quit();
422 }
423}
424
425#[derive(Clone)]
427pub struct Bucket {
428 pub(crate) tree: Tree,
429 pub(crate) _holder: Arc<BucketMeta>,
430 pub(crate) inner: Arc<Inner>,
431}
432
433impl Bucket {
434 pub fn begin(&'_ self) -> Result<TxnKV<'_>, OpCode> {
436 TxnKV::new(&self.inner.store.context, &self.tree)
437 }
438
439 pub fn view(&'_ self) -> Result<TxnView<'_>, OpCode> {
441 TxnView::new(&self.inner.store.context, &self.tree)
442 }
443
444 pub fn checkpoint(&self) {
446 self.inner.checkpoint(self.id());
447 }
448
449 pub fn id(&self) -> u64 {
451 self.tree.bucket_id()
452 }
453
454 pub fn options(&self) -> &Options {
456 &self.inner.store.opt
457 }
458}
459
460impl Deref for Bucket {
461 type Target = Inner;
462
463 fn deref(&self) -> &Self::Target {
464 &self.inner
465 }
466}
467
468#[derive(Clone)]
470pub struct Mace {
471 pub(crate) inner: Arc<Inner>,
472}
473
474impl Mace {
475 pub fn new(opt: ParsedOptions) -> Result<Self, OpCode> {
477 let opt = Arc::new(opt);
478 let (tx, erx) = channel();
479 let (etx, rx) = channel();
480
481 let mut builder = ManifestBuilder::new_with_channels(opt.clone(), tx, rx);
482 builder.load()?;
483 let manifest = Handle::new(builder.finish());
484
485 let mut recover = Recovery::new(opt.clone());
486 let (wal_boot, ctx) = recover.phase1(manifest.numerics.clone())?;
487 let observer = Arc::new(StoreFlushObserver::new(manifest, ctx));
488 let reader = Arc::new(StoreDataReader::new(manifest));
489 manifest.set_context(ctx, reader, observer.clone());
490
491 let store = MutRef::new(Store::new(opt.clone(), manifest, ctx));
492
493 recover.phase2(&wal_boot, store.clone())?;
494 store.start();
495 let handle = start_gc(store.clone(), store.context);
496 observer.attach_handle(handle.clone());
497 let evictor = Evictor::new(
498 opt.clone(),
499 manifest.buckets,
500 manifest.numerics.clone(),
501 erx,
502 etx,
503 );
504 evictor.start();
505
506 Ok(Self {
507 inner: Arc::new(Inner { store, gc: handle }),
508 })
509 }
510
511 pub fn options(&self) -> &Options {
513 &self.inner.store.opt
514 }
515
516 pub fn new_bucket<S: AsRef<str>>(&self, name: S, opt: BucketOptions) -> Result<Bucket, OpCode> {
519 Inner::new_bucket(&self.inner, name.as_ref(), opt.validate())
520 }
521
522 pub fn get_bucket<S: AsRef<str>>(&self, name: S) -> Result<Bucket, OpCode> {
525 Inner::get_bucket(&self.inner, name.as_ref())
526 }
527
528 pub fn update_bucket_opt<S: AsRef<str>>(
535 &self,
536 name: S,
537 opt: BucketOptions,
538 ) -> Result<(), OpCode> {
539 Inner::update_bucket_opt(&self.inner, name.as_ref(), opt.validate())
540 }
541
542 pub fn active_buckets(&self) -> Vec<String> {
544 self.inner.store.manifest.loaded_bucket_names()
545 }
546
547 pub fn drop_bucket<S: AsRef<str>>(&self, name: S) -> Result<(), OpCode> {
549 Inner::drop_bucket(&self.inner, name.as_ref())
550 }
551
552 pub fn del_bucket<S: AsRef<str>>(&self, name: S) -> Result<(), OpCode> {
554 Inner::del_bucket(&self.inner, name.as_ref())
555 }
556
557 pub fn vacuum_bucket<S: AsRef<str>>(&self, name: S) -> Result<VacuumStats, OpCode> {
559 Inner::vacuum_bucket(&self.inner, name.as_ref())
560 }
561
562 pub fn is_bucket_vacuuming<S: AsRef<str>>(&self, name: S) -> Result<bool, OpCode> {
564 Inner::is_bucket_vacuuming(&self.inner, name.as_ref())
565 }
566
567 pub fn vacuum_meta(&self) -> Result<MetaVacuumStats, OpCode> {
569 Inner::vacuum_meta(&self.inner)
570 }
571
572 pub fn disable_gc(&self) {
574 self.inner.gc.pause();
575 }
576
577 pub fn enable_gc(&self) {
579 self.inner.gc.resume();
580 }
581
582 pub fn start_gc(&self) {
584 self.inner.gc.start();
585 }
586
587 pub fn data_gc_count(&self) -> u64 {
589 self.inner.gc.data_gc_count()
590 }
591
592 pub fn blob_gc_count(&self) -> u64 {
594 self.inner.gc.blob_gc_count()
595 }
596
597 pub fn nr_buckets(&self) -> u64 {
599 self.inner
600 .store
601 .manifest
602 .nr_buckets
603 .load(std::sync::atomic::Ordering::Relaxed)
604 }
605
606 pub fn sync(&self) -> Result<(), OpCode> {
608 self.inner.store.context.sync()
609 }
610}