libblobd_direct/
tuples.rs1use crate::backing_store::BoundedStore;
2use crate::backing_store::PartitionStore;
3use crate::object::ObjectState;
4use crate::object::ObjectTuple;
5use crate::object::OBJECT_TUPLE_SERIALISED_LEN;
6use crate::objects::ObjectId;
7use crate::pages::Pages;
8use ahash::HashMap;
9use bufpool::buf::Buf;
10use futures::stream::iter;
11use futures::StreamExt;
12use itertools::Itertools;
13use num_traits::FromPrimitive;
14use off64::u16;
15use off64::u32;
16use off64::u64;
17use off64::usz;
18use parking_lot::Mutex;
19use roaring::RoaringBitmap;
20use signal_future::SignalFuture;
21use signal_future::SignalFutureController;
22use std::sync::Arc;
23use tokio::time::sleep;
24
25pub(crate) type BundleId = u32;
26
27#[derive(Default)]
28struct TuplesState {
29 object_id_to_bundle: HashMap<ObjectId, BundleId>,
30 bundle_tuples: Vec<HashMap<ObjectId, ObjectTuple>>,
31 free_bundles: RoaringBitmap,
37 dirty_bundles: RoaringBitmap,
38 free_and_dirty_bundles: RoaringBitmap,
39 dirty_signals: Vec<SignalFutureController<()>>,
40}
41
42#[derive(Clone)]
43pub(crate) struct Tuples {
44 state: Arc<Mutex<TuplesState>>,
45 max_tuples_per_bundle: u16,
46}
47
48impl Tuples {
49 pub fn new(pages: Pages, bundles_with_initial_data: Vec<Vec<ObjectTuple>>) -> Self {
50 let max_tuples_per_bundle = u16!(pages.spage_size() / OBJECT_TUPLE_SERIALISED_LEN);
51
52 let mut state = TuplesState::default();
53 for (bundle_id, tuples_init) in bundles_with_initial_data.into_iter().enumerate() {
54 let bundle_id = u32!(bundle_id);
55 let tuple_count = u16!(tuples_init.len());
56 assert!(tuple_count <= max_tuples_per_bundle);
57 let mut tuples = HashMap::<ObjectId, ObjectTuple>::default();
58 for t in tuples_init {
59 assert!(state.object_id_to_bundle.insert(t.id, bundle_id).is_none());
60 assert!(tuples.insert(t.id, t).is_none());
61 }
62 state.bundle_tuples.push(tuples);
63 if tuple_count < max_tuples_per_bundle {
64 state.free_bundles.insert(bundle_id);
65 };
66 }
67 Self {
68 state: Arc::new(Mutex::new(state)),
69 max_tuples_per_bundle,
70 }
71 }
72
73 pub async fn insert_object(&self, o: ObjectTuple) {
74 let fut = {
75 let mut state = self.state.lock();
76 let bundle_id = state
77 .free_and_dirty_bundles
78 .min()
79 .or_else(|| state.free_bundles.min())
80 .expect("run out of space for tuples");
81 assert!(state.object_id_to_bundle.insert(o.id, bundle_id).is_none());
82 let bundle = &mut state.bundle_tuples[usz!(bundle_id)];
83 assert!(bundle.insert(o.id, o).is_none());
84 assert!(u16!(bundle.len()) <= self.max_tuples_per_bundle);
85 if u16!(bundle.len()) == self.max_tuples_per_bundle {
86 state.free_and_dirty_bundles.remove(bundle_id);
87 state.free_bundles.remove(bundle_id);
88 } else {
89 state.free_and_dirty_bundles.insert(bundle_id);
90 }
91 state.dirty_bundles.insert(bundle_id);
92 let (fut, fut_ctl) = SignalFuture::new();
93 state.dirty_signals.push(fut_ctl);
94 fut
95 };
96 fut.await;
97 }
98
99 pub async fn update_object_id_and_state(
100 &self,
101 cur_object_id: ObjectId,
102 new_object_id: ObjectId,
103 new_object_state: ObjectState,
104 ) {
105 let fut = {
106 let mut state = self.state.lock();
107 let bundle_id = state.object_id_to_bundle.remove(&cur_object_id).unwrap();
108 let bundle = &mut state.bundle_tuples[usz!(bundle_id)];
109 let bundle_len = u16!(bundle.len());
110 let mut tuple = bundle.remove(&cur_object_id).unwrap();
111 tuple.id = new_object_id;
112 tuple.state = new_object_state;
113 assert!(bundle.insert(new_object_id, tuple).is_none());
114 assert!(state
115 .object_id_to_bundle
116 .insert(new_object_id, bundle_id)
117 .is_none());
118 if bundle_len < self.max_tuples_per_bundle {
120 state.free_and_dirty_bundles.insert(bundle_id);
121 }
122 state.dirty_bundles.insert(bundle_id);
123 let (fut, fut_ctl) = SignalFuture::new();
124 state.dirty_signals.push(fut_ctl);
125 fut
126 };
127 fut.await;
128 }
129
130 pub async fn delete_object(&self, object_id: ObjectId) -> ObjectTuple {
131 let (tuple, fut) = {
132 let mut state = self.state.lock();
133 let bundle_id = state.object_id_to_bundle.remove(&object_id).unwrap();
134 let tuple = state.bundle_tuples[usz!(bundle_id)]
135 .remove(&object_id)
136 .unwrap();
137 state.free_and_dirty_bundles.insert(bundle_id);
138 state.dirty_bundles.insert(bundle_id);
139 let (fut, fut_ctl) = SignalFuture::new();
140 state.dirty_signals.push(fut_ctl);
141 (tuple, fut)
142 };
143 fut.await;
144 tuple
145 }
146
147 pub async fn start_background_commit_loop(&self, dev: BoundedStore, pages: Pages) {
149 loop {
150 let mut changes = Vec::new();
151 let signals = {
153 let mut state = self.state.lock();
154 for bundle_id in state.dirty_bundles.iter() {
155 let tuples = state.bundle_tuples[usz!(bundle_id)]
156 .values()
157 .cloned()
158 .collect_vec();
159 changes.push((bundle_id, tuples));
160 }
161 state.dirty_bundles.clear();
162 state.free_and_dirty_bundles.clear();
163 state.dirty_signals.drain(..).collect_vec()
164 };
165 if changes.is_empty() {
166 sleep(std::time::Duration::from_micros(10)).await;
168 continue;
169 };
170 iter(changes)
171 .for_each_concurrent(None, |(bundle_id, tuples)| {
172 let dev = dev.clone();
173 let pages = pages.clone();
174 async move {
175 let mut page = pages.allocate_uninitialised(pages.spage_size());
176 let bundle_tuple_count = u16!(tuples.len());
177 for (i, t) in tuples.into_iter().enumerate() {
178 let off = i * usz!(OBJECT_TUPLE_SERIALISED_LEN);
179 t.serialise(&mut page[off..off + usz!(OBJECT_TUPLE_SERIALISED_LEN)]);
180 }
181 if bundle_tuple_count < self.max_tuples_per_bundle {
182 page[usz!(bundle_tuple_count) * usz!(OBJECT_TUPLE_SERIALISED_LEN)] =
183 ObjectState::_EndOfBundleTuples as u8;
184 };
185 dev
186 .write_at(u64!(bundle_id) * pages.spage_size(), page)
187 .await;
188 }
189 })
190 .await;
191 for signal in signals {
192 signal.signal(());
193 }
194 }
195 }
196}
197
198pub(crate) async fn load_raw_tuples_area_from_device(
199 dev: &PartitionStore,
200 heap_dev_offset: u64,
201) -> Buf {
202 dev.read_at(0, heap_dev_offset).await
203}
204
205pub(crate) fn tuple_bundles_count(heap_dev_offset: u64, pages: &Pages) -> u64 {
206 assert_eq!(heap_dev_offset % pages.spage_size(), 0);
207 heap_dev_offset / pages.spage_size()
208}
209
210pub(crate) fn load_tuples_from_raw_tuples_area(
211 entire_tuples_area_raw: &[u8],
212 pages: &Pages,
213 mut on_tuple: impl FnMut(BundleId, ObjectTuple),
214) {
215 assert_eq!(entire_tuples_area_raw.len() % usz!(pages.spage_size()), 0);
216 for (bundle_id, raw) in entire_tuples_area_raw
217 .chunks_exact(usz!(pages.spage_size()))
218 .enumerate()
219 {
220 for tuple_raw in raw.chunks_exact(usz!(OBJECT_TUPLE_SERIALISED_LEN)) {
221 let object_state = ObjectState::from_u8(tuple_raw[0]).unwrap();
222 if object_state == ObjectState::_EndOfBundleTuples {
223 break;
224 };
225 let tuple = ObjectTuple::deserialise(tuple_raw);
226 on_tuple(u32!(bundle_id), tuple);
227 }
228 }
229}