libblobd_direct/
objects.rs

1use crate::allocator::AllocDir;
2use crate::allocator::Allocator;
3use crate::backing_store::BoundedStore;
4use crate::backing_store::PartitionStore;
5use crate::metrics::BlobdMetrics;
6use crate::object::calc_object_layout;
7use crate::object::Object;
8use crate::object::ObjectMetadata;
9use crate::object::ObjectState;
10use crate::object::ObjectTuple;
11use crate::pages::Pages;
12use crate::tuples::load_raw_tuples_area_from_device;
13use crate::tuples::load_tuples_from_raw_tuples_area;
14use crate::tuples::tuple_bundles_count;
15use crate::tuples::Tuples;
16use dashmap::mapref::entry::Entry;
17use dashmap::DashMap;
18use futures::stream::iter;
19use futures::StreamExt;
20use off64::u64;
21use off64::usz;
22use parking_lot::Mutex;
23use parking_lot::RwLock;
24use serde::Deserialize;
25use std::cmp::max;
26use std::cmp::min;
27use std::collections::BTreeMap;
28use std::hash::BuildHasherDefault;
29use std::io::Cursor;
30use std::sync::atomic::AtomicU64;
31use std::sync::atomic::AtomicUsize;
32use std::sync::atomic::Ordering;
33use std::sync::Arc;
34use tinybuf::TinyBuf;
35use tracing::warn;
36
37// Map from object ID to bucket ID. It just happens so that object IDs are also chronological, so this map allows removing objects when they're committed and also popping chronologically.
38pub(crate) type IncompleteObjects = Arc<RwLock<BTreeMap<u64, Object>>>;
39// XXH3 should be a much higher quality hash than FxHasher.
40pub(crate) type CommittedObjects =
41  Arc<DashMap<TinyBuf, Object, BuildHasherDefault<twox_hash::xxh3::Hash64>>>;
42
43pub(crate) struct LoadedObjects {
44  pub committed_objects: CommittedObjects,
45  pub heap_allocator: Allocator,
46  pub incomplete_objects: IncompleteObjects,
47  pub next_object_id: u64,
48  pub tuples: Tuples,
49}
50
51pub(crate) async fn format_device_for_objects(tuples_area: BoundedStore, pages: &Pages) {
52  // We need to erase the entire area so that even when new tuples and bundles are added the end is always ObjectState::_EndOfObjects.
53  const BUFSIZE: u64 = 1024 * 1024 * 1024;
54  for offset in (0..tuples_area.len()).step_by(usz!(BUFSIZE)) {
55    let size = min(tuples_area.len() - offset, BUFSIZE);
56    tuples_area
57      .write_at(offset, pages.slow_allocate_with_zeros(size))
58      .await;
59  }
60}
61
62/// Progress of initial loading of tuples and object metadata across all partitions.
63#[derive(Default)]
64pub(crate) struct ClusterLoadProgress {
65  pub(crate) objects_loaded: AtomicU64,
66  pub(crate) objects_total: AtomicU64,
67  pub(crate) partitions_completed: AtomicUsize,
68}
69
70pub(crate) async fn load_objects_from_device(
71  progress: Arc<ClusterLoadProgress>,
72  // This must not be bounded as we'll use raw partition absolute offsets.
73  dev: PartitionStore,
74  pages: Pages,
75  metrics: BlobdMetrics,
76  heap_dev_offset: u64,
77  heap_size: u64,
78) -> LoadedObjects {
79  let committed: Arc<CommittedObjects> = Default::default();
80  let incomplete: Arc<RwLock<BTreeMap<u64, Object>>> = Default::default();
81  let next_object_id: Arc<Mutex<u64>> = Default::default();
82  let heap_allocator = Arc::new(Mutex::new(Allocator::new(
83    heap_dev_offset,
84    heap_size,
85    pages.clone(),
86    AllocDir::Right,
87    metrics.clone(),
88  )));
89
90  let mut tuples: Vec<Vec<ObjectTuple>> =
91    vec![Vec::new(); usz!(tuple_bundles_count(heap_dev_offset, &pages))];
92  let raw_tuples_area = load_raw_tuples_area_from_device(&dev, heap_dev_offset).await;
93  load_tuples_from_raw_tuples_area(&raw_tuples_area, &pages, |bundle_id, tuple| {
94    tuples[usz!(bundle_id)].push(tuple)
95  });
96
97  // TODO Tune this concurrency value and make it configurable. Don't overwhelm the system memory or disk I/O queues, but go as fast as possible because this is slow.
98  iter(tuples.iter()).for_each_concurrent(Some(1048576), |bundle_tuples| {
99      let committed = committed.clone();
100      let dev = dev.clone();
101      let heap_allocator = heap_allocator.clone();
102      let incomplete = incomplete.clone();
103      let metrics = metrics.clone();
104      let next_object_id = next_object_id.clone();
105      let pages = pages.clone();
106      let progress = progress.clone();
107
108      async move {
109        // We don't need to use for_each_concurrent, as we already have high parallelism in the outer bundles loop, and using it makes code a bit more complex due to needing locks and Arc clones.
110        for tuple in bundle_tuples.iter() {
111          progress.objects_total.fetch_add(1, Ordering::Relaxed);
112          let object_state = tuple.state;
113          let object_id = tuple.id;
114
115          // Do not insert tuple yet, as we may drop it.
116
117          let metadata_raw = dev
118            .read_at(
119              tuple.metadata_dev_offset,
120              1 << tuple.metadata_page_size_pow2,
121            )
122            .await;
123          progress.objects_loaded.fetch_add(1, Ordering::Relaxed);
124          // Use a custom deserialiser so we can use a cursor, which will allow access to the rmp_serde::Deserializer::position method so we can determine the metadata byte size.
125          let mut deserialiser = rmp_serde::Deserializer::new(Cursor::new(metadata_raw));
126          // Yes, rmp_serde stops reading once fully deserialised, and doesn't error if there is extra junk afterwards.
127          let metadata = ObjectMetadata::deserialize(&mut deserialiser).unwrap();
128          let metadata_size = deserialiser.position();
129          let object_size = metadata.size;
130
131          let layout = calc_object_layout(&pages, object_size);
132
133          let obj = Object::new(object_id, object_state, metadata, metadata_size);
134          // Check if we should insert first before doing anything further e.g. updating metrics, updating allocator, pushing tuple. However, do update next_object_id; it's harmless to skip a few but dangerous to reuse: we still need to check assertions around IDs being unique and we may not actually delete duplicate objects before we accidentally reuse them.
135          // We'll increment metrics for object counts at the end, in one addition.
136          {
137            let mut next_object_id = next_object_id.lock();
138            *next_object_id = max(*next_object_id, object_id + 1);
139          };
140          match object_state {
141            ObjectState::Incomplete => {
142              assert!(incomplete.write().insert(object_id, obj.clone()).is_none());
143            }
144            ObjectState::Committed => {
145              match committed.entry(obj.key.clone()) {
146                Entry::Occupied(mut ent) => {
147                  let existing_id = ent.get().id();
148                  assert_ne!(existing_id, object_id);
149                  let key_hex = hex::encode(&obj.key);
150                  warn!(
151                    key_hex,
152                    older_object_id = min(existing_id, object_id),
153                    newer_object_id = max(existing_id, object_id),
154                    "multiple committed objects found with the same key, will only keep the latest committed one"
155                  );
156                  if existing_id > object_id {
157                    continue;
158                  };
159                  ent.insert(obj.clone());
160                }
161                Entry::Vacant(vacant) => {
162                  vacant.insert(obj.clone());
163                }
164              };
165            }
166            _ => unreachable!(),
167          };
168
169          {
170            let mut heap_allocator = heap_allocator.lock();
171            heap_allocator.mark_as_allocated(tuple.metadata_dev_offset, tuple.metadata_page_size_pow2);
172            for &page_dev_offset in obj.lpage_dev_offsets.iter() {
173              heap_allocator.mark_as_allocated(page_dev_offset, pages.lpage_size_pow2);
174            }
175            for (i, sz) in layout.tail_page_sizes_pow2 {
176              let page_dev_offset = obj.tail_page_dev_offsets[usz!(i)];
177              heap_allocator.mark_as_allocated(page_dev_offset, sz);
178            }
179          }
180
181          metrics
182            .0
183            .object_metadata_bytes
184            .fetch_add(metadata_size, Ordering::Relaxed);
185          metrics
186            .0
187            .object_data_bytes
188            .fetch_add(object_size, Ordering::Relaxed);
189        }
190      }
191    })
192    .await;
193  progress
194    .partitions_completed
195    .fetch_add(1, Ordering::Relaxed);
196  metrics
197    .0
198    .incomplete_object_count
199    .fetch_add(u64!(incomplete.read().len()), Ordering::Relaxed);
200  metrics
201    .0
202    .committed_object_count
203    .fetch_add(u64!(committed.len()), Ordering::Relaxed);
204
205  LoadedObjects {
206    committed_objects: Arc::into_inner(committed).unwrap(),
207    heap_allocator: Arc::into_inner(heap_allocator).unwrap().into_inner(),
208    incomplete_objects: incomplete,
209    next_object_id: Arc::into_inner(next_object_id).unwrap().into_inner(),
210    tuples: Tuples::new(pages, tuples),
211  }
212}