use crate::backing_store::BoundedStore;
use crate::object::ObjectState;
use crate::object::ObjectTuple;
use crate::object::OBJECT_TUPLE_SERIALISED_LEN;
use crate::pages::Pages;
use croaring::Bitmap;
use futures::stream::iter;
use futures::StreamExt;
use itertools::Itertools;
use off64::u16;
use off64::u32;
use off64::u64;
use off64::usz;
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use signal_future::SignalFuture;
use signal_future::SignalFutureController;
use std::sync::Arc;
use tokio::time::sleep;
type ObjectId = u64;
type BundleId = u32;
#[derive(Default)]
struct TuplesState {
object_id_to_bundle: FxHashMap<ObjectId, BundleId>,
bundle_tuples: Vec<FxHashMap<ObjectId, ObjectTuple>>,
free_bundles: Bitmap,
dirty_bundles: Bitmap,
free_and_dirty_bundles: Bitmap,
dirty_signals: Vec<SignalFutureController<()>>,
}
#[derive(Clone)]
pub(crate) struct Tuples {
state: Arc<Mutex<TuplesState>>,
max_tuples_per_bundle: u16,
}
impl Tuples {
pub fn new(
pages: Pages,
bundles_with_initial_data: FxHashMap<BundleId, Vec<ObjectTuple>>,
) -> Self {
let max_tuples_per_bundle = u16!(pages.spage_size() / OBJECT_TUPLE_SERIALISED_LEN);
let mut state = TuplesState::default();
let mut next_expected_bundle_id = 0;
for (bundle_id, tuples_init) in bundles_with_initial_data
.into_iter()
.sorted_unstable_by_key(|(bundle_id, _)| *bundle_id)
{
assert_eq!(bundle_id, next_expected_bundle_id);
next_expected_bundle_id += 1;
let bundle_id = u32!(bundle_id);
let tuple_count = u16!(tuples_init.len());
assert!(tuple_count <= max_tuples_per_bundle);
let mut tuples = FxHashMap::<u64, ObjectTuple>::default();
for t in tuples_init {
assert!(state.object_id_to_bundle.insert(t.id, bundle_id).is_none());
assert!(tuples.insert(t.id, t).is_none());
}
state.bundle_tuples.push(tuples);
if tuple_count < max_tuples_per_bundle {
state.free_bundles.add(bundle_id);
};
}
Self {
state: Arc::new(Mutex::new(state)),
max_tuples_per_bundle,
}
}
pub async fn insert_object(&self, o: ObjectTuple) {
let fut = {
let mut state = self.state.lock();
let bundle_id = state
.free_and_dirty_bundles
.minimum()
.or_else(|| state.free_bundles.minimum())
.expect("run out of space for tuples");
assert!(state.object_id_to_bundle.insert(o.id, bundle_id).is_none());
let bundle = &mut state.bundle_tuples[usz!(bundle_id)];
assert!(bundle.insert(o.id, o).is_none());
assert!(u16!(bundle.len()) <= self.max_tuples_per_bundle);
if u16!(bundle.len()) == self.max_tuples_per_bundle {
state.free_and_dirty_bundles.remove(bundle_id);
state.free_bundles.remove(bundle_id);
} else {
state.free_and_dirty_bundles.add(bundle_id);
}
state.dirty_bundles.add(bundle_id);
let (fut, fut_ctl) = SignalFuture::new();
state.dirty_signals.push(fut_ctl);
fut
};
fut.await;
}
pub async fn update_object_id_and_state(
&self,
cur_object_id: u64,
new_object_id: u64,
new_object_state: ObjectState,
) {
let fut = {
let mut state = self.state.lock();
let bundle_id = state.object_id_to_bundle.remove(&cur_object_id).unwrap();
let bundle = &mut state.bundle_tuples[usz!(bundle_id)];
let bundle_len = u16!(bundle.len());
let mut tuple = bundle.remove(&cur_object_id).unwrap();
tuple.id = new_object_id;
tuple.state = new_object_state;
assert!(bundle.insert(new_object_id, tuple).is_none());
assert!(state
.object_id_to_bundle
.insert(new_object_id, bundle_id)
.is_none());
if bundle_len < self.max_tuples_per_bundle {
state.free_and_dirty_bundles.add(bundle_id);
}
state.dirty_bundles.add(bundle_id);
let (fut, fut_ctl) = SignalFuture::new();
state.dirty_signals.push(fut_ctl);
fut
};
fut.await;
}
pub async fn delete_object(&self, object_id: u64) -> ObjectTuple {
let (tuple, fut) = {
let mut state = self.state.lock();
let bundle_id = state.object_id_to_bundle.remove(&object_id).unwrap();
let tuple = state.bundle_tuples[usz!(bundle_id)]
.remove(&object_id)
.unwrap();
state.free_and_dirty_bundles.add(bundle_id);
state.dirty_bundles.add(bundle_id);
let (fut, fut_ctl) = SignalFuture::new();
state.dirty_signals.push(fut_ctl);
(tuple, fut)
};
fut.await;
tuple
}
pub async fn start_background_commit_loop(&self, dev: BoundedStore, pages: Pages) {
loop {
let mut changes = Vec::new();
let signals = {
let mut state = self.state.lock();
for bundle_id in state.dirty_bundles.iter() {
let tuples = state.bundle_tuples[usz!(bundle_id)]
.values()
.cloned()
.collect_vec();
changes.push((bundle_id, tuples));
}
state.dirty_bundles.clear();
state.free_and_dirty_bundles.clear();
state.dirty_signals.drain(..).collect_vec()
};
if changes.is_empty() {
sleep(std::time::Duration::from_micros(10)).await;
continue;
};
iter(changes)
.for_each_concurrent(None, |(bundle_id, tuples)| {
let dev = dev.clone();
let pages = pages.clone();
async move {
let mut page = pages.allocate_uninitialised(pages.spage_size());
let bundle_tuple_count = u16!(tuples.len());
for (i, t) in tuples.into_iter().enumerate() {
let off = i * usz!(OBJECT_TUPLE_SERIALISED_LEN);
t.serialise(&mut page[off..off + usz!(OBJECT_TUPLE_SERIALISED_LEN)]);
}
if bundle_tuple_count < self.max_tuples_per_bundle {
page[usz!(bundle_tuple_count) * usz!(OBJECT_TUPLE_SERIALISED_LEN)] =
ObjectState::_EndOfBundleTuples as u8;
};
dev
.write_at(u64!(bundle_id) * pages.spage_size(), page)
.await;
}
})
.await;
for signal in signals {
signal.signal(());
}
}
}
}