libblobd_direct/partition/
mod.rs1use crate::backing_store::PartitionStore;
2use crate::ctx::Ctx;
3use crate::metrics::BlobdMetrics;
4use crate::objects::format_device_for_objects;
5use crate::objects::load_objects_from_device;
6use crate::objects::ClusterLoadProgress;
7use crate::objects::LoadedObjects;
8use crate::pages::Pages;
9use crate::tuples::load_raw_tuples_area_from_device;
10use crate::util::ceil_pow2;
11use crate::util::floor_pow2;
12use crate::BlobdCfg;
13use bufpool::buf::Buf;
14use parking_lot::Mutex;
15use std::sync::atomic::AtomicU64;
16use std::sync::Arc;
17use tokio::spawn;
18use tracing::info;
19
20pub(crate) struct PartitionLoader {
21 pub(crate) dev: PartitionStore,
22 pages: Pages,
23 metrics: BlobdMetrics,
24 partition_idx: usize,
25 heap_dev_offset: u64,
26 heap_size: u64,
27}
28
29impl PartitionLoader {
30 pub fn new(
31 partition_idx: usize,
32 partition_store: PartitionStore,
33 cfg: BlobdCfg,
34 pages: Pages,
35 metrics: BlobdMetrics,
36 ) -> Self {
37 let tuples_area_size = ceil_pow2(cfg.object_tuples_area_reserved_space, cfg.lpage_size_pow2);
38 let heap_dev_offset = tuples_area_size;
39 let heap_end = floor_pow2(partition_store.len(), pages.lpage_size_pow2);
40 let heap_size = heap_end - heap_dev_offset;
41 assert!(tuples_area_size + heap_dev_offset <= heap_end);
42
43 info!(
44 partition_number = partition_idx,
45 partition_offset = partition_store.offset(),
46 partition_size = partition_store.len(),
47 heap_dev_offset = heap_dev_offset,
48 heap_size,
49 "init partition",
50 );
51
52 Self {
53 dev: partition_store,
54 heap_dev_offset,
55 heap_size,
56 metrics,
57 pages,
58 partition_idx,
59 }
60 }
61
62 pub async fn format(&self) {
63 format_device_for_objects(self.dev.bounded(0, self.heap_dev_offset), &self.pages).await;
64 self.dev.sync().await;
65 }
66
67 pub async fn load_raw_tuples_area(&self) -> Buf {
68 load_raw_tuples_area_from_device(&self.dev, self.heap_dev_offset).await
69 }
70
71 pub async fn load_and_start(self, load_progress: Arc<ClusterLoadProgress>) -> Partition {
72 let dev = &self.dev;
73 let pages = &self.pages;
74
75 let LoadedObjects {
76 committed_objects,
77 heap_allocator,
78 incomplete_objects,
79 next_object_id,
80 tuples,
81 } = load_objects_from_device(
82 load_progress,
83 dev.clone(),
84 pages.clone(),
85 self.metrics.clone(),
86 self.heap_dev_offset,
87 self.heap_size,
88 )
89 .await;
90
91 let ctx = Arc::new(Ctx {
92 committed_objects,
93 device: self.dev.clone(),
94 heap_allocator: Mutex::new(heap_allocator),
95 incomplete_objects,
96 metrics: self.metrics.clone(),
97 next_object_id: AtomicU64::new(next_object_id),
98 pages: pages.clone(),
99 partition_idx: self.partition_idx,
100 tuples: tuples.clone(),
101 });
102
103 spawn({
104 let dev = dev.bounded(0, self.heap_dev_offset);
105 let pages = pages.clone();
106 async move { tuples.start_background_commit_loop(dev, pages).await }
107 });
108
109 Partition { ctx }
110 }
111}
112
113pub(crate) struct Partition {
114 pub ctx: Arc<Ctx>,
115}