libblobd_direct/partition/
mod.rs

1use crate::backing_store::PartitionStore;
2use crate::ctx::Ctx;
3use crate::metrics::BlobdMetrics;
4use crate::objects::format_device_for_objects;
5use crate::objects::load_objects_from_device;
6use crate::objects::ClusterLoadProgress;
7use crate::objects::LoadedObjects;
8use crate::pages::Pages;
9use crate::tuples::load_raw_tuples_area_from_device;
10use crate::util::ceil_pow2;
11use crate::util::floor_pow2;
12use crate::BlobdCfg;
13use bufpool::buf::Buf;
14use parking_lot::Mutex;
15use std::sync::Arc;
16use tokio::spawn;
17use tracing::info;
18
19pub(crate) struct PartitionLoader {
20  pub(crate) dev: PartitionStore,
21  pages: Pages,
22  metrics: BlobdMetrics,
23  partition_idx: usize,
24  heap_dev_offset: u64,
25  heap_size: u64,
26}
27
28impl PartitionLoader {
29  pub fn new(
30    partition_idx: usize,
31    partition_store: PartitionStore,
32    cfg: BlobdCfg,
33    pages: Pages,
34    metrics: BlobdMetrics,
35  ) -> Self {
36    let tuples_area_size = ceil_pow2(cfg.object_tuples_area_reserved_space, cfg.lpage_size_pow2);
37    let heap_dev_offset = tuples_area_size;
38    let heap_end = floor_pow2(partition_store.len(), pages.lpage_size_pow2);
39    let heap_size = heap_end - heap_dev_offset;
40    assert!(tuples_area_size + heap_dev_offset <= heap_end);
41
42    info!(
43      partition_number = partition_idx,
44      partition_offset = partition_store.offset(),
45      partition_size = partition_store.len(),
46      heap_dev_offset = heap_dev_offset,
47      heap_size,
48      "init partition",
49    );
50
51    Self {
52      dev: partition_store,
53      heap_dev_offset,
54      heap_size,
55      metrics,
56      pages,
57      partition_idx,
58    }
59  }
60
61  pub async fn format(&self) {
62    format_device_for_objects(self.dev.bounded(0, self.heap_dev_offset), &self.pages).await;
63    self.dev.sync().await;
64  }
65
66  pub async fn load_raw_tuples_area(&self) -> Buf {
67    load_raw_tuples_area_from_device(&self.dev, self.heap_dev_offset).await
68  }
69
70  pub async fn load_and_start(self, load_progress: Arc<ClusterLoadProgress>) -> Partition {
71    let dev = &self.dev;
72    let pages = &self.pages;
73
74    let LoadedObjects {
75      committed_objects,
76      heap_allocator,
77      incomplete_objects,
78      tuples,
79    } = load_objects_from_device(
80      load_progress,
81      dev.clone(),
82      pages.clone(),
83      self.metrics.clone(),
84      self.heap_dev_offset,
85      self.heap_size,
86    )
87    .await;
88
89    let ctx = Arc::new(Ctx {
90      committed_objects,
91      device: self.dev.clone(),
92      heap_allocator: Mutex::new(heap_allocator),
93      incomplete_objects,
94      metrics: self.metrics.clone(),
95      pages: pages.clone(),
96      partition_idx: self.partition_idx,
97      tuples: tuples.clone(),
98    });
99
100    spawn({
101      let dev = dev.bounded(0, self.heap_dev_offset);
102      let pages = pages.clone();
103      async move { tuples.start_background_commit_loop(dev, pages).await }
104    });
105
106    Partition { ctx }
107  }
108}
109
110pub(crate) struct Partition {
111  pub ctx: Arc<Ctx>,
112}