libblobd_direct/
objects.rs

1use crate::allocator::AllocDir;
2use crate::allocator::Allocator;
3use crate::backing_store::BoundedStore;
4use crate::backing_store::PartitionStore;
5use crate::metrics::BlobdMetrics;
6use crate::object::calc_object_layout;
7use crate::object::Object;
8use crate::object::ObjectMetadata;
9use crate::object::ObjectState;
10use crate::object::ObjectTuple;
11use crate::pages::Pages;
12use crate::tuples::load_raw_tuples_area_from_device;
13use crate::tuples::load_tuples_from_raw_tuples_area;
14use crate::tuples::tuple_bundles_count;
15use crate::tuples::Tuples;
16use dashmap::mapref::entry::Entry;
17use dashmap::DashMap;
18use futures::stream::iter;
19use futures::StreamExt;
20use off64::u64;
21use off64::usz;
22use parking_lot::Mutex;
23use parking_lot::RwLock;
24use serde::Deserialize;
25use std::cmp::max;
26use std::cmp::min;
27use std::collections::BTreeMap;
28use std::io::Cursor;
29use std::sync::atomic::AtomicU64;
30use std::sync::atomic::AtomicUsize;
31use std::sync::atomic::Ordering;
32use std::sync::Arc;
33use tracing::warn;
34
35pub(crate) type ObjectId = u128;
36
37// Map from object ID to bucket ID. It just happens so that object IDs are also chronological, so this map allows removing objects when they're committed and also popping chronologically.
38pub(crate) type IncompleteObjects = Arc<RwLock<BTreeMap<ObjectId, Object>>>;
39pub(crate) type CommittedObjects = Arc<DashMap<Vec<u8>, Object>>;
40
41pub(crate) struct LoadedObjects {
42  pub committed_objects: CommittedObjects,
43  pub heap_allocator: Allocator,
44  pub incomplete_objects: IncompleteObjects,
45  pub tuples: Tuples,
46}
47
48pub(crate) async fn format_device_for_objects(tuples_area: BoundedStore, pages: &Pages) {
49  // We need to erase the entire area so that even when new tuples and bundles are added the end is always ObjectState::_EndOfObjects.
50  const BUFSIZE: u64 = 1024 * 1024 * 1024;
51  for offset in (0..tuples_area.len()).step_by(usz!(BUFSIZE)) {
52    let size = min(tuples_area.len() - offset, BUFSIZE);
53    tuples_area
54      .write_at(offset, pages.slow_allocate_with_zeros(size))
55      .await;
56  }
57}
58
59/// Progress of initial loading of tuples and object metadata across all partitions.
60#[derive(Default)]
61pub(crate) struct ClusterLoadProgress {
62  pub(crate) objects_loaded: AtomicU64,
63  pub(crate) objects_total: AtomicU64,
64  pub(crate) partitions_completed: AtomicUsize,
65}
66
67pub(crate) async fn load_objects_from_device(
68  progress: Arc<ClusterLoadProgress>,
69  // This must not be bounded as we'll use raw partition absolute offsets.
70  dev: PartitionStore,
71  pages: Pages,
72  metrics: BlobdMetrics,
73  heap_dev_offset: u64,
74  heap_size: u64,
75) -> LoadedObjects {
76  let committed: Arc<CommittedObjects> = Default::default();
77  let incomplete: Arc<RwLock<BTreeMap<ObjectId, Object>>> = Default::default();
78  let heap_allocator = Arc::new(Mutex::new(Allocator::new(
79    heap_dev_offset,
80    heap_size,
81    pages.clone(),
82    AllocDir::Right,
83    metrics.clone(),
84  )));
85
86  let mut tuples: Vec<Vec<ObjectTuple>> =
87    vec![Vec::new(); usz!(tuple_bundles_count(heap_dev_offset, &pages))];
88  let raw_tuples_area = load_raw_tuples_area_from_device(&dev, heap_dev_offset).await;
89  load_tuples_from_raw_tuples_area(&raw_tuples_area, &pages, |bundle_id, tuple| {
90    tuples[usz!(bundle_id)].push(tuple)
91  });
92
93  // TODO Tune this concurrency value and make it configurable. Don't overwhelm the system memory or disk I/O queues, but go as fast as possible because this is slow.
94  iter(tuples.iter()).for_each_concurrent(Some(1048576), |bundle_tuples| {
95      let committed = committed.clone();
96      let dev = dev.clone();
97      let heap_allocator = heap_allocator.clone();
98      let incomplete = incomplete.clone();
99      let metrics = metrics.clone();
100      let pages = pages.clone();
101      let progress = progress.clone();
102
103      async move {
104        // We don't need to use for_each_concurrent, as we already have high parallelism in the outer bundles loop, and using it makes code a bit more complex due to needing locks and Arc clones.
105        for tuple in bundle_tuples.iter() {
106          progress.objects_total.fetch_add(1, Ordering::Relaxed);
107          let object_state = tuple.state;
108          let object_id = tuple.id;
109
110          // Do not insert tuple yet, as we may drop it.
111
112          let metadata_raw = dev
113            .read_at(
114              tuple.metadata_dev_offset,
115              1 << tuple.metadata_page_size_pow2,
116            )
117            .await;
118          progress.objects_loaded.fetch_add(1, Ordering::Relaxed);
119          // Use a custom deserialiser so we can use a cursor, which will allow access to the rmp_serde::Deserializer::position method so we can determine the metadata byte size.
120          let mut deserialiser = rmp_serde::Deserializer::new(Cursor::new(metadata_raw));
121          // Yes, rmp_serde stops reading once fully deserialised, and doesn't error if there is extra junk afterwards.
122          let metadata = ObjectMetadata::deserialize(&mut deserialiser).unwrap();
123          let metadata_size = deserialiser.position();
124          let object_size = metadata.size;
125
126          let layout = calc_object_layout(&pages, object_size);
127
128          let obj = Object::new(object_id, object_state, metadata, metadata_size);
129          // Check if we should insert first before doing anything further e.g. updating metrics, updating allocator, pushing tuple.
130          // We'll increment metrics for object counts at the end, in one addition.
131          match object_state {
132            ObjectState::Incomplete => {
133              assert!(incomplete.write().insert(object_id, obj.clone()).is_none());
134            }
135            ObjectState::Committed => {
136              match committed.entry(obj.key.clone()) {
137                Entry::Occupied(mut ent) => {
138                  let existing_id = ent.get().id();
139                  assert_ne!(existing_id, object_id);
140                  let key_hex = hex::encode(&obj.key);
141                  warn!(
142                    key_hex,
143                    older_object_id = min(existing_id, object_id),
144                    newer_object_id = max(existing_id, object_id),
145                    "multiple committed objects found with the same key, will only keep the latest committed one"
146                  );
147                  if existing_id > object_id {
148                    continue;
149                  };
150                  ent.insert(obj.clone());
151                }
152                Entry::Vacant(vacant) => {
153                  vacant.insert(obj.clone());
154                }
155              };
156            }
157            _ => unreachable!(),
158          };
159
160          {
161            let mut heap_allocator = heap_allocator.lock();
162            heap_allocator.mark_as_allocated(tuple.metadata_dev_offset, tuple.metadata_page_size_pow2);
163            for &page_dev_offset in obj.lpage_dev_offsets.iter() {
164              heap_allocator.mark_as_allocated(page_dev_offset, pages.lpage_size_pow2);
165            }
166            for (i, sz) in layout.tail_page_sizes_pow2 {
167              let page_dev_offset = obj.tail_page_dev_offsets[usz!(i)];
168              heap_allocator.mark_as_allocated(page_dev_offset, sz);
169            }
170          }
171
172          metrics
173            .0
174            .object_metadata_bytes
175            .fetch_add(metadata_size, Ordering::Relaxed);
176          metrics
177            .0
178            .object_data_bytes
179            .fetch_add(object_size, Ordering::Relaxed);
180        }
181      }
182    })
183    .await;
184  progress
185    .partitions_completed
186    .fetch_add(1, Ordering::Relaxed);
187  metrics
188    .0
189    .incomplete_object_count
190    .fetch_add(u64!(incomplete.read().len()), Ordering::Relaxed);
191  metrics
192    .0
193    .committed_object_count
194    .fetch_add(u64!(committed.len()), Ordering::Relaxed);
195
196  LoadedObjects {
197    committed_objects: Arc::into_inner(committed).unwrap(),
198    heap_allocator: Arc::into_inner(heap_allocator).unwrap().into_inner(),
199    incomplete_objects: incomplete,
200    tuples: Tuples::new(pages, tuples),
201  }
202}