use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use crate::layout::BlobGuid;
pub(super) const BOOKKEEPING_SHARDS: usize = 64;
#[derive(Default)]
pub(super) struct MutationState {
pub(super) dirty: HashMap<BlobGuid, u64>,
pub(super) flushing: HashMap<BlobGuid, u64>,
pub(super) pending_deletes: HashMap<BlobGuid, u64>,
pub(super) compact_candidates: MaintenanceQueue,
pub(super) merge_candidates: MaintenanceQueue,
}
impl MutationState {
pub(super) fn is_protected(&self, guid: &BlobGuid) -> bool {
self.dirty.contains_key(guid) || self.flushing.contains_key(guid)
}
pub(super) fn is_protected_or_pending(&self, guid: &BlobGuid) -> bool {
self.is_protected(guid) || self.pending_deletes.contains_key(guid)
}
pub(super) fn remove_dirty(&mut self, guid: &BlobGuid) {
self.dirty.remove(guid);
self.flushing.remove(guid);
}
pub(super) fn remove_maintenance_candidates(&mut self, guid: &BlobGuid) -> (bool, bool) {
(
self.compact_candidates.remove(guid),
self.merge_candidates.remove(guid),
)
}
}
#[derive(Default)]
pub(super) struct MaintenanceQueue {
set: HashSet<BlobGuid>,
queue: VecDeque<BlobGuid>,
}
impl MaintenanceQueue {
pub(super) fn insert(&mut self, guid: BlobGuid) -> bool {
if self.set.insert(guid) {
self.queue.push_back(guid);
true
} else {
false
}
}
pub(super) fn remove(&mut self, guid: &BlobGuid) -> bool {
self.set.remove(guid)
}
fn pop_batch(&mut self, limit: usize) -> Vec<BlobGuid> {
let mut out = Vec::new();
while out.len() < limit {
let Some(guid) = self.queue.pop_front() else {
break;
};
if self.set.remove(&guid) {
out.push(guid);
}
}
out
}
}
pub(super) fn bookkeeping_shard_idx(guid: &BlobGuid) -> usize {
debug_assert!(BOOKKEEPING_SHARDS.is_power_of_two());
let mut lo_bytes = [0u8; 8];
let mut hi_bytes = [0u8; 8];
lo_bytes.copy_from_slice(&guid[0..8]);
hi_bytes.copy_from_slice(&guid[8..16]);
let lo = u64::from_le_bytes(lo_bytes);
let hi = u64::from_le_bytes(hi_bytes);
let mut h = lo ^ hi.rotate_left(27);
h ^= h >> 33;
h = h.wrapping_mul(0xff51_afd7_ed55_8ccd);
h ^= h >> 33;
(h as usize) & (BOOKKEEPING_SHARDS - 1)
}
#[derive(Debug, Clone, Copy)]
pub(super) enum CandidateKind {
Compact,
Merge,
}
pub(super) fn pop_candidate_batch(
shards: &[Mutex<MutationState>; BOOKKEEPING_SHARDS],
cursor: &AtomicUsize,
total: &AtomicUsize,
kind: CandidateKind,
limit: usize,
) -> Vec<BlobGuid> {
if limit == 0 {
return Vec::new();
}
let mut out = Vec::new();
let start = cursor.fetch_add(1, Ordering::Relaxed) & (BOOKKEEPING_SHARDS - 1);
for offset in 0..BOOKKEEPING_SHARDS {
let idx = (start + offset) & (BOOKKEEPING_SHARDS - 1);
let shard = &shards[idx];
let mut state = shard.lock().unwrap();
let queue = match kind {
CandidateKind::Compact => &mut state.compact_candidates,
CandidateKind::Merge => &mut state.merge_candidates,
};
let remaining = limit - out.len();
let popped = queue.pop_batch(remaining);
total.fetch_sub(popped.len(), Ordering::Relaxed);
out.extend(popped);
if out.len() == limit {
return out;
}
}
out
}