libblobd_direct/partition/
mod.rs1use crate::backing_store::PartitionStore;
2use crate::ctx::Ctx;
3use crate::metrics::BlobdMetrics;
4use crate::objects::format_device_for_objects;
5use crate::objects::load_objects_from_device;
6use crate::objects::ClusterLoadProgress;
7use crate::objects::LoadedObjects;
8use crate::pages::Pages;
9use crate::tuples::load_raw_tuples_area_from_device;
10use crate::util::ceil_pow2;
11use crate::util::floor_pow2;
12use crate::BlobdCfg;
13use bufpool::buf::Buf;
14use parking_lot::Mutex;
15use std::sync::Arc;
16use tokio::spawn;
17use tracing::info;
18
19pub(crate) struct PartitionLoader {
20 pub(crate) dev: PartitionStore,
21 pages: Pages,
22 metrics: BlobdMetrics,
23 partition_idx: usize,
24 heap_dev_offset: u64,
25 heap_size: u64,
26}
27
28impl PartitionLoader {
29 pub fn new(
30 partition_idx: usize,
31 partition_store: PartitionStore,
32 cfg: BlobdCfg,
33 pages: Pages,
34 metrics: BlobdMetrics,
35 ) -> Self {
36 let tuples_area_size = ceil_pow2(cfg.object_tuples_area_reserved_space, cfg.lpage_size_pow2);
37 let heap_dev_offset = tuples_area_size;
38 let heap_end = floor_pow2(partition_store.len(), pages.lpage_size_pow2);
39 let heap_size = heap_end - heap_dev_offset;
40 assert!(tuples_area_size + heap_dev_offset <= heap_end);
41
42 info!(
43 partition_number = partition_idx,
44 partition_offset = partition_store.offset(),
45 partition_size = partition_store.len(),
46 heap_dev_offset = heap_dev_offset,
47 heap_size,
48 "init partition",
49 );
50
51 Self {
52 dev: partition_store,
53 heap_dev_offset,
54 heap_size,
55 metrics,
56 pages,
57 partition_idx,
58 }
59 }
60
61 pub async fn format(&self) {
62 format_device_for_objects(self.dev.bounded(0, self.heap_dev_offset), &self.pages).await;
63 self.dev.sync().await;
64 }
65
66 pub async fn load_raw_tuples_area(&self) -> Buf {
67 load_raw_tuples_area_from_device(&self.dev, self.heap_dev_offset).await
68 }
69
70 pub async fn load_and_start(self, load_progress: Arc<ClusterLoadProgress>) -> Partition {
71 let dev = &self.dev;
72 let pages = &self.pages;
73
74 let LoadedObjects {
75 committed_objects,
76 heap_allocator,
77 incomplete_objects,
78 tuples,
79 } = load_objects_from_device(
80 load_progress,
81 dev.clone(),
82 pages.clone(),
83 self.metrics.clone(),
84 self.heap_dev_offset,
85 self.heap_size,
86 )
87 .await;
88
89 let ctx = Arc::new(Ctx {
90 committed_objects,
91 device: self.dev.clone(),
92 heap_allocator: Mutex::new(heap_allocator),
93 incomplete_objects,
94 metrics: self.metrics.clone(),
95 pages: pages.clone(),
96 partition_idx: self.partition_idx,
97 tuples: tuples.clone(),
98 });
99
100 spawn({
101 let dev = dev.bounded(0, self.heap_dev_offset);
102 let pages = pages.clone();
103 async move { tuples.start_background_commit_loop(dev, pages).await }
104 });
105
106 Partition { ctx }
107 }
108}
109
110pub(crate) struct Partition {
111 pub ctx: Arc<Ctx>,
112}