libblobd_direct/
objects.rs1use crate::allocator::AllocDir;
2use crate::allocator::Allocator;
3use crate::backing_store::BoundedStore;
4use crate::backing_store::PartitionStore;
5use crate::metrics::BlobdMetrics;
6use crate::object::calc_object_layout;
7use crate::object::Object;
8use crate::object::ObjectMetadata;
9use crate::object::ObjectState;
10use crate::object::ObjectTuple;
11use crate::pages::Pages;
12use crate::tuples::load_raw_tuples_area_from_device;
13use crate::tuples::load_tuples_from_raw_tuples_area;
14use crate::tuples::tuple_bundles_count;
15use crate::tuples::Tuples;
16use dashmap::mapref::entry::Entry;
17use dashmap::DashMap;
18use futures::stream::iter;
19use futures::StreamExt;
20use off64::u64;
21use off64::usz;
22use parking_lot::Mutex;
23use parking_lot::RwLock;
24use serde::Deserialize;
25use std::cmp::max;
26use std::cmp::min;
27use std::collections::BTreeMap;
28use std::hash::BuildHasherDefault;
29use std::io::Cursor;
30use std::sync::atomic::AtomicU64;
31use std::sync::atomic::AtomicUsize;
32use std::sync::atomic::Ordering;
33use std::sync::Arc;
34use tinybuf::TinyBuf;
35use tracing::warn;
36
37pub(crate) type IncompleteObjects = Arc<RwLock<BTreeMap<u64, Object>>>;
39pub(crate) type CommittedObjects =
41 Arc<DashMap<TinyBuf, Object, BuildHasherDefault<twox_hash::xxh3::Hash64>>>;
42
43pub(crate) struct LoadedObjects {
44 pub committed_objects: CommittedObjects,
45 pub heap_allocator: Allocator,
46 pub incomplete_objects: IncompleteObjects,
47 pub next_object_id: u64,
48 pub tuples: Tuples,
49}
50
51pub(crate) async fn format_device_for_objects(tuples_area: BoundedStore, pages: &Pages) {
52 const BUFSIZE: u64 = 1024 * 1024 * 1024;
54 for offset in (0..tuples_area.len()).step_by(usz!(BUFSIZE)) {
55 let size = min(tuples_area.len() - offset, BUFSIZE);
56 tuples_area
57 .write_at(offset, pages.slow_allocate_with_zeros(size))
58 .await;
59 }
60}
61
62#[derive(Default)]
64pub(crate) struct ClusterLoadProgress {
65 pub(crate) objects_loaded: AtomicU64,
66 pub(crate) objects_total: AtomicU64,
67 pub(crate) partitions_completed: AtomicUsize,
68}
69
70pub(crate) async fn load_objects_from_device(
71 progress: Arc<ClusterLoadProgress>,
72 dev: PartitionStore,
74 pages: Pages,
75 metrics: BlobdMetrics,
76 heap_dev_offset: u64,
77 heap_size: u64,
78) -> LoadedObjects {
79 let committed: Arc<CommittedObjects> = Default::default();
80 let incomplete: Arc<RwLock<BTreeMap<u64, Object>>> = Default::default();
81 let next_object_id: Arc<Mutex<u64>> = Default::default();
82 let heap_allocator = Arc::new(Mutex::new(Allocator::new(
83 heap_dev_offset,
84 heap_size,
85 pages.clone(),
86 AllocDir::Right,
87 metrics.clone(),
88 )));
89
90 let mut tuples: Vec<Vec<ObjectTuple>> =
91 vec![Vec::new(); usz!(tuple_bundles_count(heap_dev_offset, &pages))];
92 let raw_tuples_area = load_raw_tuples_area_from_device(&dev, heap_dev_offset).await;
93 load_tuples_from_raw_tuples_area(&raw_tuples_area, &pages, |bundle_id, tuple| {
94 tuples[usz!(bundle_id)].push(tuple)
95 });
96
97 iter(tuples.iter()).for_each_concurrent(Some(1048576), |bundle_tuples| {
99 let committed = committed.clone();
100 let dev = dev.clone();
101 let heap_allocator = heap_allocator.clone();
102 let incomplete = incomplete.clone();
103 let metrics = metrics.clone();
104 let next_object_id = next_object_id.clone();
105 let pages = pages.clone();
106 let progress = progress.clone();
107
108 async move {
109 for tuple in bundle_tuples.iter() {
111 progress.objects_total.fetch_add(1, Ordering::Relaxed);
112 let object_state = tuple.state;
113 let object_id = tuple.id;
114
115 let metadata_raw = dev
118 .read_at(
119 tuple.metadata_dev_offset,
120 1 << tuple.metadata_page_size_pow2,
121 )
122 .await;
123 progress.objects_loaded.fetch_add(1, Ordering::Relaxed);
124 let mut deserialiser = rmp_serde::Deserializer::new(Cursor::new(metadata_raw));
126 let metadata = ObjectMetadata::deserialize(&mut deserialiser).unwrap();
128 let metadata_size = deserialiser.position();
129 let object_size = metadata.size;
130
131 let layout = calc_object_layout(&pages, object_size);
132
133 let obj = Object::new(object_id, object_state, metadata, metadata_size);
134 {
137 let mut next_object_id = next_object_id.lock();
138 *next_object_id = max(*next_object_id, object_id + 1);
139 };
140 match object_state {
141 ObjectState::Incomplete => {
142 assert!(incomplete.write().insert(object_id, obj.clone()).is_none());
143 }
144 ObjectState::Committed => {
145 match committed.entry(obj.key.clone()) {
146 Entry::Occupied(mut ent) => {
147 let existing_id = ent.get().id();
148 assert_ne!(existing_id, object_id);
149 let key_hex = hex::encode(&obj.key);
150 warn!(
151 key_hex,
152 older_object_id = min(existing_id, object_id),
153 newer_object_id = max(existing_id, object_id),
154 "multiple committed objects found with the same key, will only keep the latest committed one"
155 );
156 if existing_id > object_id {
157 continue;
158 };
159 ent.insert(obj.clone());
160 }
161 Entry::Vacant(vacant) => {
162 vacant.insert(obj.clone());
163 }
164 };
165 }
166 _ => unreachable!(),
167 };
168
169 {
170 let mut heap_allocator = heap_allocator.lock();
171 heap_allocator.mark_as_allocated(tuple.metadata_dev_offset, tuple.metadata_page_size_pow2);
172 for &page_dev_offset in obj.lpage_dev_offsets.iter() {
173 heap_allocator.mark_as_allocated(page_dev_offset, pages.lpage_size_pow2);
174 }
175 for (i, sz) in layout.tail_page_sizes_pow2 {
176 let page_dev_offset = obj.tail_page_dev_offsets[usz!(i)];
177 heap_allocator.mark_as_allocated(page_dev_offset, sz);
178 }
179 }
180
181 metrics
182 .0
183 .object_metadata_bytes
184 .fetch_add(metadata_size, Ordering::Relaxed);
185 metrics
186 .0
187 .object_data_bytes
188 .fetch_add(object_size, Ordering::Relaxed);
189 }
190 }
191 })
192 .await;
193 progress
194 .partitions_completed
195 .fetch_add(1, Ordering::Relaxed);
196 metrics
197 .0
198 .incomplete_object_count
199 .fetch_add(u64!(incomplete.read().len()), Ordering::Relaxed);
200 metrics
201 .0
202 .committed_object_count
203 .fetch_add(u64!(committed.len()), Ordering::Relaxed);
204
205 LoadedObjects {
206 committed_objects: Arc::into_inner(committed).unwrap(),
207 heap_allocator: Arc::into_inner(heap_allocator).unwrap().into_inner(),
208 incomplete_objects: incomplete,
209 next_object_id: Arc::into_inner(next_object_id).unwrap().into_inner(),
210 tuples: Tuples::new(pages, tuples),
211 }
212}