libblobd_direct/
tuples.rs

1use crate::backing_store::BoundedStore;
2use crate::backing_store::PartitionStore;
3use crate::object::ObjectState;
4use crate::object::ObjectTuple;
5use crate::object::OBJECT_TUPLE_SERIALISED_LEN;
6use crate::objects::ObjectId;
7use crate::pages::Pages;
8use ahash::HashMap;
9use bufpool::buf::Buf;
10use futures::stream::iter;
11use futures::StreamExt;
12use itertools::Itertools;
13use num_traits::FromPrimitive;
14use off64::u16;
15use off64::u32;
16use off64::u64;
17use off64::usz;
18use parking_lot::Mutex;
19use roaring::RoaringBitmap;
20use signal_future::SignalFuture;
21use signal_future::SignalFutureController;
22use std::sync::Arc;
23use tokio::time::sleep;
24
25pub(crate) type BundleId = u32;
26
27#[derive(Default)]
28struct TuplesState {
29  object_id_to_bundle: HashMap<ObjectId, BundleId>,
30  bundle_tuples: Vec<HashMap<ObjectId, ObjectTuple>>,
31  // These bitmaps provide some features:
32  // - Quickly find a dirty bundle to add a new tuple to, to try and coalesce writes.
33  // - Try to keep picking the same dirty bundle (`.minimum()`) while it's still free to add new tuples to, to further try and coalesce writes. Deletes may interfere with this.
34  // - Provide quick set-like insertion and deletion.
35  // Any dirty tuple is as good as any other, so there's no need for an ordered (by usage) data structure.
36  free_bundles: RoaringBitmap,
37  dirty_bundles: RoaringBitmap,
38  free_and_dirty_bundles: RoaringBitmap,
39  dirty_signals: Vec<SignalFutureController<()>>,
40}
41
42#[derive(Clone)]
43pub(crate) struct Tuples {
44  state: Arc<Mutex<TuplesState>>,
45  max_tuples_per_bundle: u16,
46}
47
48impl Tuples {
49  pub fn new(pages: Pages, bundles_with_initial_data: Vec<Vec<ObjectTuple>>) -> Self {
50    let max_tuples_per_bundle = u16!(pages.spage_size() / OBJECT_TUPLE_SERIALISED_LEN);
51
52    let mut state = TuplesState::default();
53    for (bundle_id, tuples_init) in bundles_with_initial_data.into_iter().enumerate() {
54      let bundle_id = u32!(bundle_id);
55      let tuple_count = u16!(tuples_init.len());
56      assert!(tuple_count <= max_tuples_per_bundle);
57      let mut tuples = HashMap::<ObjectId, ObjectTuple>::default();
58      for t in tuples_init {
59        assert!(state.object_id_to_bundle.insert(t.id, bundle_id).is_none());
60        assert!(tuples.insert(t.id, t).is_none());
61      }
62      state.bundle_tuples.push(tuples);
63      if tuple_count < max_tuples_per_bundle {
64        state.free_bundles.insert(bundle_id);
65      };
66    }
67    Self {
68      state: Arc::new(Mutex::new(state)),
69      max_tuples_per_bundle,
70    }
71  }
72
73  pub async fn insert_object(&self, o: ObjectTuple) {
74    let fut = {
75      let mut state = self.state.lock();
76      let bundle_id = state
77        .free_and_dirty_bundles
78        .min()
79        .or_else(|| state.free_bundles.min())
80        .expect("run out of space for tuples");
81      assert!(state.object_id_to_bundle.insert(o.id, bundle_id).is_none());
82      let bundle = &mut state.bundle_tuples[usz!(bundle_id)];
83      assert!(bundle.insert(o.id, o).is_none());
84      assert!(u16!(bundle.len()) <= self.max_tuples_per_bundle);
85      if u16!(bundle.len()) == self.max_tuples_per_bundle {
86        state.free_and_dirty_bundles.remove(bundle_id);
87        state.free_bundles.remove(bundle_id);
88      } else {
89        state.free_and_dirty_bundles.insert(bundle_id);
90      }
91      state.dirty_bundles.insert(bundle_id);
92      let (fut, fut_ctl) = SignalFuture::new();
93      state.dirty_signals.push(fut_ctl);
94      fut
95    };
96    fut.await;
97  }
98
99  pub async fn update_object_id_and_state(
100    &self,
101    cur_object_id: ObjectId,
102    new_object_id: ObjectId,
103    new_object_state: ObjectState,
104  ) {
105    let fut = {
106      let mut state = self.state.lock();
107      let bundle_id = state.object_id_to_bundle.remove(&cur_object_id).unwrap();
108      let bundle = &mut state.bundle_tuples[usz!(bundle_id)];
109      let bundle_len = u16!(bundle.len());
110      let mut tuple = bundle.remove(&cur_object_id).unwrap();
111      tuple.id = new_object_id;
112      tuple.state = new_object_state;
113      assert!(bundle.insert(new_object_id, tuple).is_none());
114      assert!(state
115        .object_id_to_bundle
116        .insert(new_object_id, bundle_id)
117        .is_none());
118      // We didn't add any tuples, but we did update one, making this bundle dirty and eligible for `free_and_dirty_bundles`.
119      if bundle_len < self.max_tuples_per_bundle {
120        state.free_and_dirty_bundles.insert(bundle_id);
121      }
122      state.dirty_bundles.insert(bundle_id);
123      let (fut, fut_ctl) = SignalFuture::new();
124      state.dirty_signals.push(fut_ctl);
125      fut
126    };
127    fut.await;
128  }
129
130  pub async fn delete_object(&self, object_id: ObjectId) -> ObjectTuple {
131    let (tuple, fut) = {
132      let mut state = self.state.lock();
133      let bundle_id = state.object_id_to_bundle.remove(&object_id).unwrap();
134      let tuple = state.bundle_tuples[usz!(bundle_id)]
135        .remove(&object_id)
136        .unwrap();
137      state.free_and_dirty_bundles.insert(bundle_id);
138      state.dirty_bundles.insert(bundle_id);
139      let (fut, fut_ctl) = SignalFuture::new();
140      state.dirty_signals.push(fut_ctl);
141      (tuple, fut)
142    };
143    fut.await;
144    tuple
145  }
146
147  // One loop with some delay works better than one tight loop per bundle (i.e. one async loop and queue per sector size) as this allows for batching, increasing opportunities for sequential and coalesced writes further down the stack (kernel, iSCSI/NVMe, disk controller, etc.). Many random writes of the smallest (i.e. sector) size is NOT peak performance.
148  pub async fn start_background_commit_loop(&self, dev: BoundedStore, pages: Pages) {
149    loop {
150      let mut changes = Vec::new();
151      // TODO Coalesce writes if nearest aligned power of two has high ratio of changed tuples. For example, if bundles 9, 10, 11, 12, 13, and 15 have changed, write bundles 8 to 15 (inclusive) as one page to offset of bundle 8, including bundles 8 and 14 even though they haven't changed.
152      let signals = {
153        let mut state = self.state.lock();
154        for bundle_id in state.dirty_bundles.iter() {
155          let tuples = state.bundle_tuples[usz!(bundle_id)]
156            .values()
157            .cloned()
158            .collect_vec();
159          changes.push((bundle_id, tuples));
160        }
161        state.dirty_bundles.clear();
162        state.free_and_dirty_bundles.clear();
163        state.dirty_signals.drain(..).collect_vec()
164      };
165      if changes.is_empty() {
166        // TODO Tune, allow configuration.
167        sleep(std::time::Duration::from_micros(10)).await;
168        continue;
169      };
170      iter(changes)
171        .for_each_concurrent(None, |(bundle_id, tuples)| {
172          let dev = dev.clone();
173          let pages = pages.clone();
174          async move {
175            let mut page = pages.allocate_uninitialised(pages.spage_size());
176            let bundle_tuple_count = u16!(tuples.len());
177            for (i, t) in tuples.into_iter().enumerate() {
178              let off = i * usz!(OBJECT_TUPLE_SERIALISED_LEN);
179              t.serialise(&mut page[off..off + usz!(OBJECT_TUPLE_SERIALISED_LEN)]);
180            }
181            if bundle_tuple_count < self.max_tuples_per_bundle {
182              page[usz!(bundle_tuple_count) * usz!(OBJECT_TUPLE_SERIALISED_LEN)] =
183                ObjectState::_EndOfBundleTuples as u8;
184            };
185            dev
186              .write_at(u64!(bundle_id) * pages.spage_size(), page)
187              .await;
188          }
189        })
190        .await;
191      for signal in signals {
192        signal.signal(());
193      }
194    }
195  }
196}
197
198pub(crate) async fn load_raw_tuples_area_from_device(
199  dev: &PartitionStore,
200  heap_dev_offset: u64,
201) -> Buf {
202  dev.read_at(0, heap_dev_offset).await
203}
204
205pub(crate) fn tuple_bundles_count(heap_dev_offset: u64, pages: &Pages) -> u64 {
206  assert_eq!(heap_dev_offset % pages.spage_size(), 0);
207  heap_dev_offset / pages.spage_size()
208}
209
210pub(crate) fn load_tuples_from_raw_tuples_area(
211  entire_tuples_area_raw: &[u8],
212  pages: &Pages,
213  mut on_tuple: impl FnMut(BundleId, ObjectTuple),
214) {
215  assert_eq!(entire_tuples_area_raw.len() % usz!(pages.spage_size()), 0);
216  for (bundle_id, raw) in entire_tuples_area_raw
217    .chunks_exact(usz!(pages.spage_size()))
218    .enumerate()
219  {
220    for tuple_raw in raw.chunks_exact(usz!(OBJECT_TUPLE_SERIALISED_LEN)) {
221      let object_state = ObjectState::from_u8(tuple_raw[0]).unwrap();
222      if object_state == ObjectState::_EndOfBundleTuples {
223        break;
224      };
225      let tuple = ObjectTuple::deserialise(tuple_raw);
226      on_tuple(u32!(bundle_id), tuple);
227    }
228  }
229}