libblobd_direct/partition/
mod.rs

1use crate::backing_store::PartitionStore;
2use crate::ctx::Ctx;
3use crate::metrics::BlobdMetrics;
4use crate::objects::format_device_for_objects;
5use crate::objects::load_objects_from_device;
6use crate::objects::ClusterLoadProgress;
7use crate::objects::LoadedObjects;
8use crate::pages::Pages;
9use crate::tuples::load_raw_tuples_area_from_device;
10use crate::util::ceil_pow2;
11use crate::util::floor_pow2;
12use crate::BlobdCfg;
13use bufpool::buf::Buf;
14use parking_lot::Mutex;
15use std::sync::atomic::AtomicU64;
16use std::sync::Arc;
17use tokio::spawn;
18use tracing::info;
19
20pub(crate) struct PartitionLoader {
21  pub(crate) dev: PartitionStore,
22  pages: Pages,
23  metrics: BlobdMetrics,
24  partition_idx: usize,
25  heap_dev_offset: u64,
26  heap_size: u64,
27}
28
29impl PartitionLoader {
30  pub fn new(
31    partition_idx: usize,
32    partition_store: PartitionStore,
33    cfg: BlobdCfg,
34    pages: Pages,
35    metrics: BlobdMetrics,
36  ) -> Self {
37    let tuples_area_size = ceil_pow2(cfg.object_tuples_area_reserved_space, cfg.lpage_size_pow2);
38    let heap_dev_offset = tuples_area_size;
39    let heap_end = floor_pow2(partition_store.len(), pages.lpage_size_pow2);
40    let heap_size = heap_end - heap_dev_offset;
41    assert!(tuples_area_size + heap_dev_offset <= heap_end);
42
43    info!(
44      partition_number = partition_idx,
45      partition_offset = partition_store.offset(),
46      partition_size = partition_store.len(),
47      heap_dev_offset = heap_dev_offset,
48      heap_size,
49      "init partition",
50    );
51
52    Self {
53      dev: partition_store,
54      heap_dev_offset,
55      heap_size,
56      metrics,
57      pages,
58      partition_idx,
59    }
60  }
61
62  pub async fn format(&self) {
63    format_device_for_objects(self.dev.bounded(0, self.heap_dev_offset), &self.pages).await;
64    self.dev.sync().await;
65  }
66
67  pub async fn load_raw_tuples_area(&self) -> Buf {
68    load_raw_tuples_area_from_device(&self.dev, self.heap_dev_offset).await
69  }
70
71  pub async fn load_and_start(self, load_progress: Arc<ClusterLoadProgress>) -> Partition {
72    let dev = &self.dev;
73    let pages = &self.pages;
74
75    let LoadedObjects {
76      committed_objects,
77      heap_allocator,
78      incomplete_objects,
79      next_object_id,
80      tuples,
81    } = load_objects_from_device(
82      load_progress,
83      dev.clone(),
84      pages.clone(),
85      self.metrics.clone(),
86      self.heap_dev_offset,
87      self.heap_size,
88    )
89    .await;
90
91    let ctx = Arc::new(Ctx {
92      committed_objects,
93      device: self.dev.clone(),
94      heap_allocator: Mutex::new(heap_allocator),
95      incomplete_objects,
96      metrics: self.metrics.clone(),
97      next_object_id: AtomicU64::new(next_object_id),
98      pages: pages.clone(),
99      partition_idx: self.partition_idx,
100      tuples: tuples.clone(),
101    });
102
103    spawn({
104      let dev = dev.bounded(0, self.heap_dev_offset);
105      let pages = pages.clone();
106      async move { tuples.start_background_commit_loop(dev, pages).await }
107    });
108
109    Partition { ctx }
110  }
111}
112
113pub(crate) struct Partition {
114  pub ctx: Arc<Ctx>,
115}