use crate::bloom::BloomFilter;
use crate::direction::Direction;
use crate::iter::MergeQueueIter;
use crate::queue::{Commit, Merge};
use bytes::Bytes;
use papaya::HashSet;
use std::collections::BTreeMap;
use std::sync::Arc;
pub struct ReadsetConflictScenario {
commit: Arc<Commit>,
readset: HashSet<Bytes>,
readset_bloom: BloomFilter,
}
impl ReadsetConflictScenario {
pub fn new(writeset_keys: &[Bytes], readset_keys: &[Bytes]) -> Self {
let mut ws = BTreeMap::new();
for k in writeset_keys {
ws.insert(k.clone(), Some(Bytes::from_static(b"v")));
}
let mut writeset_bloom = BloomFilter::new();
for k in ws.keys() {
writeset_bloom.insert(k);
}
let min_key = ws.keys().next().cloned().unwrap_or_default();
let max_key = ws.keys().next_back().cloned().unwrap_or_default();
let commit = Arc::new(Commit {
id: 1,
writeset: Arc::new(ws),
writeset_bloom,
min_key,
max_key,
});
let readset = HashSet::new();
let mut readset_bloom = BloomFilter::new();
{
let pin = readset.pin();
for k in readset_keys {
pin.insert(k.clone());
readset_bloom.insert(k);
}
}
Self {
commit,
readset,
readset_bloom,
}
}
pub fn check_with_bloom(&self) -> bool {
self.commit.is_disjoint_readset_bloom(&self.readset, &self.readset_bloom)
}
pub fn check_without_bloom(&self) -> bool {
self.commit.is_disjoint_readset(&self.readset)
}
}
pub struct WritesetConflictScenario {
committed: Arc<Commit>,
current: Arc<Commit>,
}
impl WritesetConflictScenario {
pub fn new(committed_keys: &[Bytes], current_keys: &[Bytes]) -> Self {
Self {
committed: Arc::new(Self::build_commit(committed_keys, 1)),
current: Arc::new(Self::build_commit(current_keys, 2)),
}
}
pub fn check_with_bloom(&self) -> bool {
self.committed.is_disjoint_writeset_bloom(&self.current)
}
pub fn check_without_bloom(&self) -> bool {
self.committed.is_disjoint_writeset(&self.current)
}
fn build_commit(keys: &[Bytes], id: u64) -> Commit {
let mut ws = BTreeMap::new();
for k in keys {
ws.insert(k.clone(), Some(Bytes::from_static(b"v")));
}
let mut writeset_bloom = BloomFilter::new();
for k in ws.keys() {
writeset_bloom.insert(k);
}
let min_key = ws.keys().next().cloned().unwrap_or_default();
let max_key = ws.keys().next_back().cloned().unwrap_or_default();
Commit {
id,
writeset: Arc::new(ws),
writeset_bloom,
min_key,
max_key,
}
}
}
pub struct MergeQueueScenario {
sources: Vec<Arc<Merge>>,
beg: Bytes,
end: Bytes,
}
impl MergeQueueScenario {
pub fn new(num_sources: usize, keys_per_source: usize, total_keys: usize) -> Self {
let mut sources = Vec::with_capacity(num_sources);
for i in 0..num_sources {
let mut ws = BTreeMap::new();
for j in 0..keys_per_source {
let key_idx = (i.wrapping_mul(31) + j.wrapping_mul(17)) % total_keys;
let key = Bytes::from(format!("key_{:08}", key_idx).into_bytes());
ws.insert(key, Some(Bytes::from_static(b"v")));
}
sources.push(Arc::new(Merge {
id: i as u64,
writeset: Arc::new(ws),
}));
}
let beg = Bytes::from(b"key_00000000".to_vec());
let end = Bytes::from(format!("key_{:08}", total_keys).into_bytes());
Self {
sources,
beg,
end,
}
}
pub fn iter_forward_count(&self) -> usize {
MergeQueueIter::new(
self.sources.clone(),
self.beg.clone(),
self.end.clone(),
Direction::Forward,
)
.count()
}
pub fn iter_forward_take(&self, n: usize) -> usize {
MergeQueueIter::new(
self.sources.clone(),
self.beg.clone(),
self.end.clone(),
Direction::Forward,
)
.take(n)
.count()
}
}