libblobd_direct/
objects.rs1use crate::allocator::AllocDir;
2use crate::allocator::Allocator;
3use crate::backing_store::BoundedStore;
4use crate::backing_store::PartitionStore;
5use crate::metrics::BlobdMetrics;
6use crate::object::calc_object_layout;
7use crate::object::Object;
8use crate::object::ObjectMetadata;
9use crate::object::ObjectState;
10use crate::object::ObjectTuple;
11use crate::pages::Pages;
12use crate::tuples::load_raw_tuples_area_from_device;
13use crate::tuples::load_tuples_from_raw_tuples_area;
14use crate::tuples::tuple_bundles_count;
15use crate::tuples::Tuples;
16use dashmap::mapref::entry::Entry;
17use dashmap::DashMap;
18use futures::stream::iter;
19use futures::StreamExt;
20use off64::u64;
21use off64::usz;
22use parking_lot::Mutex;
23use parking_lot::RwLock;
24use serde::Deserialize;
25use std::cmp::max;
26use std::cmp::min;
27use std::collections::BTreeMap;
28use std::io::Cursor;
29use std::sync::atomic::AtomicU64;
30use std::sync::atomic::AtomicUsize;
31use std::sync::atomic::Ordering;
32use std::sync::Arc;
33use tracing::warn;
34
35pub(crate) type ObjectId = u128;
36
37pub(crate) type IncompleteObjects = Arc<RwLock<BTreeMap<ObjectId, Object>>>;
39pub(crate) type CommittedObjects = Arc<DashMap<Vec<u8>, Object>>;
40
41pub(crate) struct LoadedObjects {
42 pub committed_objects: CommittedObjects,
43 pub heap_allocator: Allocator,
44 pub incomplete_objects: IncompleteObjects,
45 pub tuples: Tuples,
46}
47
48pub(crate) async fn format_device_for_objects(tuples_area: BoundedStore, pages: &Pages) {
49 const BUFSIZE: u64 = 1024 * 1024 * 1024;
51 for offset in (0..tuples_area.len()).step_by(usz!(BUFSIZE)) {
52 let size = min(tuples_area.len() - offset, BUFSIZE);
53 tuples_area
54 .write_at(offset, pages.slow_allocate_with_zeros(size))
55 .await;
56 }
57}
58
59#[derive(Default)]
61pub(crate) struct ClusterLoadProgress {
62 pub(crate) objects_loaded: AtomicU64,
63 pub(crate) objects_total: AtomicU64,
64 pub(crate) partitions_completed: AtomicUsize,
65}
66
67pub(crate) async fn load_objects_from_device(
68 progress: Arc<ClusterLoadProgress>,
69 dev: PartitionStore,
71 pages: Pages,
72 metrics: BlobdMetrics,
73 heap_dev_offset: u64,
74 heap_size: u64,
75) -> LoadedObjects {
76 let committed: Arc<CommittedObjects> = Default::default();
77 let incomplete: Arc<RwLock<BTreeMap<ObjectId, Object>>> = Default::default();
78 let heap_allocator = Arc::new(Mutex::new(Allocator::new(
79 heap_dev_offset,
80 heap_size,
81 pages.clone(),
82 AllocDir::Right,
83 metrics.clone(),
84 )));
85
86 let mut tuples: Vec<Vec<ObjectTuple>> =
87 vec![Vec::new(); usz!(tuple_bundles_count(heap_dev_offset, &pages))];
88 let raw_tuples_area = load_raw_tuples_area_from_device(&dev, heap_dev_offset).await;
89 load_tuples_from_raw_tuples_area(&raw_tuples_area, &pages, |bundle_id, tuple| {
90 tuples[usz!(bundle_id)].push(tuple)
91 });
92
93 iter(tuples.iter()).for_each_concurrent(Some(1048576), |bundle_tuples| {
95 let committed = committed.clone();
96 let dev = dev.clone();
97 let heap_allocator = heap_allocator.clone();
98 let incomplete = incomplete.clone();
99 let metrics = metrics.clone();
100 let pages = pages.clone();
101 let progress = progress.clone();
102
103 async move {
104 for tuple in bundle_tuples.iter() {
106 progress.objects_total.fetch_add(1, Ordering::Relaxed);
107 let object_state = tuple.state;
108 let object_id = tuple.id;
109
110 let metadata_raw = dev
113 .read_at(
114 tuple.metadata_dev_offset,
115 1 << tuple.metadata_page_size_pow2,
116 )
117 .await;
118 progress.objects_loaded.fetch_add(1, Ordering::Relaxed);
119 let mut deserialiser = rmp_serde::Deserializer::new(Cursor::new(metadata_raw));
121 let metadata = ObjectMetadata::deserialize(&mut deserialiser).unwrap();
123 let metadata_size = deserialiser.position();
124 let object_size = metadata.size;
125
126 let layout = calc_object_layout(&pages, object_size);
127
128 let obj = Object::new(object_id, object_state, metadata, metadata_size);
129 match object_state {
132 ObjectState::Incomplete => {
133 assert!(incomplete.write().insert(object_id, obj.clone()).is_none());
134 }
135 ObjectState::Committed => {
136 match committed.entry(obj.key.clone()) {
137 Entry::Occupied(mut ent) => {
138 let existing_id = ent.get().id();
139 assert_ne!(existing_id, object_id);
140 let key_hex = hex::encode(&obj.key);
141 warn!(
142 key_hex,
143 older_object_id = min(existing_id, object_id),
144 newer_object_id = max(existing_id, object_id),
145 "multiple committed objects found with the same key, will only keep the latest committed one"
146 );
147 if existing_id > object_id {
148 continue;
149 };
150 ent.insert(obj.clone());
151 }
152 Entry::Vacant(vacant) => {
153 vacant.insert(obj.clone());
154 }
155 };
156 }
157 _ => unreachable!(),
158 };
159
160 {
161 let mut heap_allocator = heap_allocator.lock();
162 heap_allocator.mark_as_allocated(tuple.metadata_dev_offset, tuple.metadata_page_size_pow2);
163 for &page_dev_offset in obj.lpage_dev_offsets.iter() {
164 heap_allocator.mark_as_allocated(page_dev_offset, pages.lpage_size_pow2);
165 }
166 for (i, sz) in layout.tail_page_sizes_pow2 {
167 let page_dev_offset = obj.tail_page_dev_offsets[usz!(i)];
168 heap_allocator.mark_as_allocated(page_dev_offset, sz);
169 }
170 }
171
172 metrics
173 .0
174 .object_metadata_bytes
175 .fetch_add(metadata_size, Ordering::Relaxed);
176 metrics
177 .0
178 .object_data_bytes
179 .fetch_add(object_size, Ordering::Relaxed);
180 }
181 }
182 })
183 .await;
184 progress
185 .partitions_completed
186 .fetch_add(1, Ordering::Relaxed);
187 metrics
188 .0
189 .incomplete_object_count
190 .fetch_add(u64!(incomplete.read().len()), Ordering::Relaxed);
191 metrics
192 .0
193 .committed_object_count
194 .fetch_add(u64!(committed.len()), Ordering::Relaxed);
195
196 LoadedObjects {
197 committed_objects: Arc::into_inner(committed).unwrap(),
198 heap_allocator: Arc::into_inner(heap_allocator).unwrap().into_inner(),
199 incomplete_objects: incomplete,
200 tuples: Tuples::new(pages, tuples),
201 }
202}