use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use crate::record::Record;
pub(crate) type DecodedBlock = Vec<(Vec<u8>, Record)>;
const ASSUMED_BLOCK_BYTES: usize = 4 * 1024;
const SHARDS: usize = 16;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct BlockKey {
pub(crate) run_id: u64,
pub(crate) block_idx: u32,
}
#[derive(Debug)]
struct Slot {
key: BlockKey,
block: Arc<DecodedBlock>,
referenced: bool,
}
#[derive(Debug)]
struct Shard {
slots: Vec<Option<Slot>>,
index: HashMap<BlockKey, usize>,
hand: usize,
}
impl Shard {
fn with_slots(n: usize) -> Self {
Shard {
slots: (0..n).map(|_| None).collect(),
index: HashMap::with_capacity(n),
hand: 0,
}
}
fn get(&mut self, key: BlockKey) -> Option<Arc<DecodedBlock>> {
let i = *self.index.get(&key)?;
if let Some(slot) = self.slots[i].as_mut() {
slot.referenced = true;
return Some(Arc::clone(&slot.block));
}
None
}
fn insert(&mut self, key: BlockKey, block: Arc<DecodedBlock>) {
if self.slots.is_empty() {
return; }
if let Some(&i) = self.index.get(&key) {
if let Some(slot) = self.slots[i].as_mut() {
slot.block = block;
slot.referenced = true;
}
return;
}
let i = self.free_or_evict();
if let Some(old) = self.slots[i].take() {
let _ = self.index.remove(&old.key);
}
self.slots[i] = Some(Slot {
key,
block,
referenced: true,
});
let _ = self.index.insert(key, i);
}
fn free_or_evict(&mut self) -> usize {
let len = self.slots.len();
for _ in 0..(2 * len) {
let i = self.hand;
self.hand = (self.hand + 1) % len;
match self.slots[i].as_mut() {
None => return i,
Some(slot) if !slot.referenced => return i,
Some(slot) => slot.referenced = false,
}
}
self.hand
}
}
#[derive(Debug)]
pub(crate) struct BlockCache {
shards: Vec<Mutex<Shard>>,
enabled: bool,
}
impl BlockCache {
pub(crate) fn new(capacity_bytes: usize) -> Arc<Self> {
let total_slots = capacity_bytes / ASSUMED_BLOCK_BYTES;
let enabled = total_slots > 0;
let per_shard = if enabled {
total_slots.div_ceil(SHARDS).max(1)
} else {
0
};
let shards = (0..SHARDS)
.map(|_| Mutex::new(Shard::with_slots(per_shard)))
.collect();
Arc::new(BlockCache { shards, enabled })
}
pub(crate) fn get(&self, key: BlockKey) -> Option<Arc<DecodedBlock>> {
if !self.enabled {
return None;
}
self.shard(key)
.lock()
.unwrap_or_else(|p| p.into_inner())
.get(key)
}
pub(crate) fn insert(&self, key: BlockKey, block: Arc<DecodedBlock>) {
if !self.enabled {
return;
}
self.shard(key)
.lock()
.unwrap_or_else(|p| p.into_inner())
.insert(key, block);
}
fn shard(&self, key: BlockKey) -> &Mutex<Shard> {
let mut h = key.run_id.wrapping_mul(0x9E37_79B9_7F4A_7C15);
h ^= u64::from(key.block_idx).wrapping_mul(0xC2B2_AE3D_27D4_EB4F);
&self.shards[(h as usize) & (SHARDS - 1)]
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
fn block(n: u8) -> Arc<DecodedBlock> {
Arc::new(vec![(vec![n], Record::Value(vec![n]))])
}
fn key(run: u64, blk: u32) -> BlockKey {
BlockKey {
run_id: run,
block_idx: blk,
}
}
#[test]
fn test_hit_after_insert() {
let c = BlockCache::new(1 << 20);
assert!(c.get(key(1, 0)).is_none());
c.insert(key(1, 0), block(7));
let got = c.get(key(1, 0)).expect("hit");
assert_eq!(got[0].0, vec![7]);
}
#[test]
fn test_disabled_never_stores() {
let c = BlockCache::new(0);
c.insert(key(1, 0), block(1));
assert!(c.get(key(1, 0)).is_none());
}
#[test]
fn test_distinct_keys_coexist() {
let c = BlockCache::new(1 << 20);
c.insert(key(1, 0), block(1));
c.insert(key(1, 1), block(2));
c.insert(key(2, 0), block(3));
assert_eq!(c.get(key(1, 0)).unwrap()[0].0, vec![1]);
assert_eq!(c.get(key(1, 1)).unwrap()[0].0, vec![2]);
assert_eq!(c.get(key(2, 0)).unwrap()[0].0, vec![3]);
}
#[test]
fn test_eviction_bounds_a_shard() {
let mut shard = Shard::with_slots(1);
shard.insert(key(1, 0), block(1));
assert!(shard.get(key(1, 0)).is_some());
shard.insert(key(1, 1), block(2));
shard.insert(key(1, 2), block(3));
assert!(shard.get(key(1, 2)).is_some());
let resident = [key(1, 0), key(1, 1), key(1, 2)]
.into_iter()
.filter(|k| shard.get(*k).is_some())
.count();
assert_eq!(resident, 1, "a one-slot shard holds exactly one block");
}
#[test]
fn test_reinsert_same_key_updates() {
let c = BlockCache::new(1 << 20);
c.insert(key(5, 2), block(1));
c.insert(key(5, 2), block(9));
assert_eq!(c.get(key(5, 2)).unwrap()[0].0, vec![9]);
}
}