use std::collections::HashMap;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct HeatCounter {
rate: f64,
last_update: Instant,
last_emitted: Option<f64>,
half_life: Duration,
}
impl HeatCounter {
pub fn new(half_life: Duration, now: Instant) -> Self {
Self {
rate: 0.0,
last_update: now,
last_emitted: None,
half_life,
}
}
pub fn decay_to(&mut self, now: Instant) {
if self.half_life.is_zero() || self.rate == 0.0 {
self.last_update = now;
return;
}
let elapsed = now.saturating_duration_since(self.last_update);
let half_lives = elapsed.as_secs_f64() / self.half_life.as_secs_f64();
if half_lives > 64.0 {
self.rate = 0.0;
} else {
self.rate *= 0.5_f64.powf(half_lives);
if self.rate < f64::EPSILON {
self.rate = 0.0;
}
}
self.last_update = now;
}
pub fn bump(&mut self, now: Instant) {
self.decay_to(now);
self.rate += 1.0;
}
pub fn rate(&self) -> f64 {
self.rate
}
pub fn last_emitted(&self) -> Option<f64> {
self.last_emitted
}
pub fn record_emission(&mut self, rate: f64) {
self.last_emitted = Some(rate);
}
pub fn record_withdrawal(&mut self) {
self.last_emitted = Some(0.0);
}
pub fn half_life(&self) -> Duration {
self.half_life
}
pub fn last_update(&self) -> Instant {
self.last_update
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum HeatEmission {
Suppress,
Emit {
rate: f64,
},
Withdraw,
}
pub const DEFAULT_HEAT_REGISTRY_CAP: usize = 8 * 1024;
#[derive(Debug)]
pub struct HeatRegistry {
counters: HashMap<u64, HeatCounter>,
cap: usize,
}
impl Default for HeatRegistry {
fn default() -> Self {
Self {
counters: HashMap::new(),
cap: DEFAULT_HEAT_REGISTRY_CAP,
}
}
}
impl HeatRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn with_cap(cap: usize) -> Self {
Self {
counters: HashMap::new(),
cap,
}
}
pub fn with_cap_unbounded() -> Self {
Self::with_cap(0)
}
pub fn cap(&self) -> usize {
self.cap
}
pub fn len(&self) -> usize {
self.counters.len()
}
pub fn is_empty(&self) -> bool {
self.counters.is_empty()
}
pub fn entry_mut(
&mut self,
channel: u64,
half_life: Duration,
now: Instant,
) -> &mut HeatCounter {
if !self.counters.contains_key(&channel) && self.cap > 0 && self.counters.len() >= self.cap
{
self.evict_lru();
}
self.counters
.entry(channel)
.or_insert_with(|| HeatCounter::new(half_life, now))
}
fn evict_lru(&mut self) {
let victim = self
.counters
.iter()
.min_by_key(|(_, c)| c.last_update())
.map(|(k, _)| *k);
if let Some(key) = victim {
self.counters.remove(&key);
}
}
pub fn get(&self, channel: &u64) -> Option<&HeatCounter> {
self.counters.get(channel)
}
pub fn remove(&mut self, channel: &u64) {
self.counters.remove(channel);
}
pub fn iter(&self) -> impl Iterator<Item = (&u64, &HeatCounter)> {
self.counters.iter()
}
pub fn tick(
&mut self,
policy: &super::DataGravityPolicy,
now: Instant,
) -> Vec<(u64, HeatEmission)> {
let mut out = Vec::new();
for (channel, counter) in self.counters.iter_mut() {
counter.decay_to(now);
let decision = super::should_emit_heat(counter.rate, counter.last_emitted, policy);
let emission = match decision {
super::EmissionDecision::Suppress => HeatEmission::Suppress,
super::EmissionDecision::Emit { rate } => HeatEmission::Emit { rate },
super::EmissionDecision::Withdraw => HeatEmission::Withdraw,
};
if !matches!(emission, HeatEmission::Suppress) {
out.push((*channel, emission));
}
}
out
}
pub fn commit_emissions(&mut self, emissions: &[(u64, HeatEmission)]) {
for (channel, emission) in emissions {
if let Some(counter) = self.counters.get_mut(channel) {
match emission {
HeatEmission::Emit { rate } => counter.record_emission(*rate),
HeatEmission::Withdraw => counter.record_withdrawal(),
HeatEmission::Suppress => {}
}
}
}
self.counters
.retain(|_, c| !(c.rate <= 0.0 && c.last_emitted.is_some_and(|v| v <= 0.0)));
}
}
#[derive(Debug)]
pub struct BlobHeatRegistry {
counters: HashMap<[u8; 32], HeatCounter>,
cap: usize,
in_flight: std::collections::HashSet<[u8; 32]>,
}
impl Default for BlobHeatRegistry {
fn default() -> Self {
Self {
counters: HashMap::new(),
cap: DEFAULT_HEAT_REGISTRY_CAP,
in_flight: std::collections::HashSet::new(),
}
}
}
impl BlobHeatRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn with_cap(cap: usize) -> Self {
Self {
counters: HashMap::new(),
cap,
in_flight: std::collections::HashSet::new(),
}
}
pub fn with_cap_unbounded() -> Self {
Self::with_cap(0)
}
pub fn cap(&self) -> usize {
self.cap
}
pub fn len(&self) -> usize {
self.counters.len()
}
pub fn is_empty(&self) -> bool {
self.counters.is_empty()
}
pub fn entry_mut(
&mut self,
hash: [u8; 32],
half_life: Duration,
now: Instant,
) -> &mut HeatCounter {
if !self.counters.contains_key(&hash) && self.cap > 0 && self.counters.len() >= self.cap {
self.evict_lru();
}
self.counters
.entry(hash)
.or_insert_with(|| HeatCounter::new(half_life, now))
}
fn evict_lru(&mut self) {
let victim = self
.counters
.iter()
.min_by_key(|(_, c)| c.last_update())
.map(|(k, _)| *k);
if let Some(key) = victim {
self.counters.remove(&key);
self.in_flight.remove(&key);
}
}
pub fn get(&self, hash: &[u8; 32]) -> Option<&HeatCounter> {
self.counters.get(hash)
}
pub fn remove(&mut self, hash: &[u8; 32]) {
self.counters.remove(hash);
self.in_flight.remove(hash);
}
pub fn iter(&self) -> impl Iterator<Item = (&[u8; 32], &HeatCounter)> {
self.counters.iter()
}
pub fn tick(
&mut self,
policy: &super::DataGravityPolicy,
now: Instant,
) -> Vec<([u8; 32], HeatEmission)> {
let mut out = Vec::new();
for (hash, counter) in self.counters.iter_mut() {
counter.decay_to(now);
if self.in_flight.contains(hash) {
continue;
}
let decision = super::should_emit_heat(counter.rate, counter.last_emitted, policy);
let emission = match decision {
super::EmissionDecision::Suppress => HeatEmission::Suppress,
super::EmissionDecision::Emit { rate } => HeatEmission::Emit { rate },
super::EmissionDecision::Withdraw => HeatEmission::Withdraw,
};
if !matches!(emission, HeatEmission::Suppress) {
out.push((*hash, emission));
}
}
for (hash, _) in &out {
self.in_flight.insert(*hash);
}
out
}
pub fn commit_emissions(&mut self, emissions: &[([u8; 32], HeatEmission)]) {
for (hash, emission) in emissions {
if let Some(counter) = self.counters.get_mut(hash) {
match emission {
HeatEmission::Emit { rate } => counter.record_emission(*rate),
HeatEmission::Withdraw => counter.record_withdrawal(),
HeatEmission::Suppress => {}
}
}
self.in_flight.remove(hash);
}
let mut pruned: Vec<[u8; 32]> = Vec::new();
self.counters.retain(|hash, c| {
let keep = !(c.rate <= 0.0 && c.last_emitted.is_some_and(|v| v <= 0.0));
if !keep {
pruned.push(*hash);
}
keep
});
for hash in &pruned {
self.in_flight.remove(hash);
}
}
pub fn rollback_emission(&mut self, hash: &[u8; 32]) {
self.in_flight.remove(hash);
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::disallowed_methods,
reason = "test code legitimately uses std::sync::{Mutex,RwLock} for SUT setup; tests have no real poison concern"
)]
use super::*;
fn channel(seed: u64) -> u64 {
0xCAFE_BABE_0000_0000 | seed
}
fn t0() -> Instant {
Instant::now()
}
#[test]
fn fresh_counter_is_zero() {
let c = HeatCounter::new(Duration::from_secs(60), t0());
assert_eq!(c.rate(), 0.0);
assert_eq!(c.last_emitted(), None);
}
#[test]
fn bump_adds_one_when_no_decay() {
let base = t0();
let mut c = HeatCounter::new(Duration::from_secs(60), base);
c.bump(base);
assert!((c.rate() - 1.0).abs() < 1e-9);
c.bump(base);
assert!((c.rate() - 2.0).abs() < 1e-9);
}
#[test]
fn one_half_life_decays_rate_by_half() {
let base = t0();
let half = Duration::from_secs(60);
let mut c = HeatCounter::new(half, base);
c.bump(base);
c.bump(base);
c.bump(base);
c.bump(base);
c.decay_to(base + half);
assert!(
(c.rate() - 2.0).abs() < 1e-6,
"rate after half-life ≈ 2.0; got {}",
c.rate()
);
c.decay_to(base + half * 2);
assert!((c.rate() - 1.0).abs() < 1e-6);
c.decay_to(base + half * 3);
assert!((c.rate() - 0.5).abs() < 1e-6);
}
#[test]
fn long_elapse_clamps_to_zero() {
let base = t0();
let half = Duration::from_secs(60);
let mut c = HeatCounter::new(half, base);
c.bump(base);
c.decay_to(base + half * 100);
assert_eq!(c.rate(), 0.0);
}
#[test]
fn bump_decays_then_adds() {
let base = t0();
let half = Duration::from_secs(60);
let mut c = HeatCounter::new(half, base);
c.bump(base);
c.bump(base);
c.bump(base + half);
assert!((c.rate() - 2.0).abs() < 1e-6);
}
#[test]
fn record_emission_tracks_last() {
let base = t0();
let mut c = HeatCounter::new(Duration::from_secs(60), base);
c.bump(base);
c.record_emission(1.5);
assert_eq!(c.last_emitted(), Some(1.5));
c.record_withdrawal();
assert_eq!(c.last_emitted(), Some(0.0));
}
#[test]
fn new_registry_is_empty() {
let r = HeatRegistry::new();
assert!(r.is_empty());
assert_eq!(r.len(), 0);
}
#[test]
fn entry_mut_creates_on_first_access() {
let mut r = HeatRegistry::new();
let half = Duration::from_secs(60);
let counter = r.entry_mut(channel(0xA), half, t0());
counter.bump(t0());
assert!((counter.rate() - 1.0).abs() < 1e-9);
assert_eq!(r.len(), 1);
}
#[test]
fn remove_drops_entry() {
let mut r = HeatRegistry::new();
let half = Duration::from_secs(60);
let _ = r.entry_mut(channel(0xA), half, t0());
r.remove(&channel(0xA));
assert!(r.is_empty());
}
#[test]
fn entry_mut_at_cap_evicts_lru_on_new_insert() {
let base = t0();
let mut r = HeatRegistry::with_cap(2);
let half = Duration::from_secs(60);
let _ = r.entry_mut(channel(0xA), half, base);
let _ = r.entry_mut(channel(0xB), half, base + Duration::from_secs(1));
assert_eq!(r.len(), 2);
let bumped = r.entry_mut(channel(0xB), half, base + Duration::from_secs(2));
bumped.bump(base + Duration::from_secs(2));
let _ = r.entry_mut(channel(0xC), half, base + Duration::from_secs(3));
assert_eq!(r.len(), 2);
assert!(r.get(&channel(0xA)).is_none(), "LRU entry A evicted");
assert!(r.get(&channel(0xB)).is_some());
assert!(r.get(&channel(0xC)).is_some());
}
#[test]
fn entry_mut_cap_zero_is_unbounded() {
let base = t0();
let mut r = HeatRegistry::with_cap(0);
let half = Duration::from_secs(60);
for i in 0..100u64 {
let _ = r.entry_mut(channel(i), half, base);
}
assert_eq!(r.len(), 100);
}
#[test]
fn tick_prunes_fully_decayed_withdrawn_entries() {
let base = t0();
let mut r = HeatRegistry::new();
let policy = super::super::DataGravityPolicy::default();
let half = policy.decay_half_life;
let counter = r.entry_mut(channel(0xA), half, base);
counter.bump(base);
let e0 = r.tick(&policy, base);
r.commit_emissions(&e0);
assert_eq!(r.len(), 1);
let later = base + half * 100;
let emissions = r.tick(&policy, later);
assert!(emissions
.iter()
.any(|(_, e)| matches!(e, HeatEmission::Withdraw)));
r.commit_emissions(&emissions);
let after = r.tick(&policy, later + Duration::from_secs(1));
assert!(after.is_empty(), "no further emissions");
r.commit_emissions(&after);
assert_eq!(r.len(), 0, "fully-decayed withdrawn entry pruned");
}
#[test]
fn tick_emits_first_observation() {
let base = t0();
let mut r = HeatRegistry::new();
let policy = super::super::DataGravityPolicy::default();
let counter = r.entry_mut(channel(0xA), policy.decay_half_life, base);
counter.bump(base);
let emissions = r.tick(&policy, base);
assert_eq!(emissions.len(), 1);
match emissions[0].1 {
HeatEmission::Emit { rate } => assert!(rate > 0.0),
other => panic!("expected Emit, got {other:?}"),
}
r.commit_emissions(&emissions);
let emissions2 = r.tick(&policy, base);
assert!(emissions2.is_empty());
}
#[test]
fn tick_emits_withdrawal_after_decay() {
let base = t0();
let mut r = HeatRegistry::new();
let policy = super::super::DataGravityPolicy::default();
let counter = r.entry_mut(channel(0xA), policy.decay_half_life, base);
counter.bump(base);
let first = r.tick(&policy, base);
r.commit_emissions(&first);
let later = base + policy.decay_half_life * 100;
let emissions = r.tick(&policy, later);
assert_eq!(emissions.len(), 1);
assert_eq!(emissions[0].1, HeatEmission::Withdraw);
}
#[test]
fn tick_doubled_rate_re_emits() {
let base = t0();
let mut r = HeatRegistry::new();
let policy = super::super::DataGravityPolicy::default();
let counter = r.entry_mut(channel(0xA), policy.decay_half_life, base);
counter.bump(base);
let first = r.tick(&policy, base);
assert_eq!(first.len(), 1);
r.commit_emissions(&first);
for _ in 0..3 {
r.entry_mut(channel(0xA), policy.decay_half_life, base)
.bump(base);
}
let second = r.tick(&policy, base);
assert_eq!(second.len(), 1);
match second[0].1 {
HeatEmission::Emit { rate } => assert!(rate >= 4.0 * 0.99),
other => panic!("expected Emit, got {other:?}"),
}
}
#[test]
fn tick_without_commit_reissues_on_next_tick() {
let base = t0();
let mut r = HeatRegistry::new();
let policy = super::super::DataGravityPolicy::default();
let counter = r.entry_mut(channel(0xA), policy.decay_half_life, base);
counter.bump(base);
let candidates = r.tick(&policy, base);
assert_eq!(candidates.len(), 1);
assert!(matches!(candidates[0].1, HeatEmission::Emit { .. }));
let candidates2 = r.tick(&policy, base);
assert_eq!(
candidates2.len(),
1,
"transient sink failure (no commit) must not silence the next tick"
);
assert!(matches!(candidates2[0].1, HeatEmission::Emit { .. }));
}
fn hash(seed: u8) -> [u8; 32] {
let mut h = [0u8; 32];
h[0] = seed;
h
}
#[test]
fn blob_heat_registry_is_empty_by_default() {
let r = BlobHeatRegistry::new();
assert!(r.is_empty());
assert_eq!(r.cap(), DEFAULT_HEAT_REGISTRY_CAP);
}
#[test]
fn blob_heat_entry_mut_creates_then_bumps() {
let mut r = BlobHeatRegistry::new();
let half = Duration::from_secs(60);
let h = hash(0x01);
r.entry_mut(h, half, t0()).bump(t0());
let counter = r.get(&h).expect("entry should exist");
assert!(counter.rate() > 0.0);
}
#[test]
fn blob_heat_entry_mut_at_cap_evicts_lru() {
let base = t0();
let mut r = BlobHeatRegistry::with_cap(2);
let half = Duration::from_secs(60);
r.entry_mut(hash(0x01), half, base).bump(base);
r.entry_mut(hash(0x02), half, base + Duration::from_millis(10))
.bump(base + Duration::from_millis(10));
r.entry_mut(hash(0x03), half, base + Duration::from_millis(20))
.bump(base + Duration::from_millis(20));
assert!(r.get(&hash(0x01)).is_none());
assert!(r.get(&hash(0x02)).is_some());
assert!(r.get(&hash(0x03)).is_some());
}
#[test]
fn blob_heat_tick_emits_above_threshold() {
let mut r = BlobHeatRegistry::new();
let policy = super::super::policy::DataGravityPolicy::default();
let half = policy.decay_half_life;
let h = hash(0x42);
let now = t0();
for _ in 0..8 {
r.entry_mut(h, half, now).bump(now);
}
let emissions = r.tick(&policy, now);
assert!(
emissions
.iter()
.any(|(k, e)| *k == h && matches!(e, HeatEmission::Emit { rate } if *rate > 0.0)),
"tick must emit for a heated hash; got {emissions:?}"
);
}
#[test]
fn blob_heat_concurrent_tick_skips_in_flight_candidates() {
let mut r = BlobHeatRegistry::new();
let policy = super::super::policy::DataGravityPolicy::default();
let half = policy.decay_half_life;
let h = hash(0x42);
let now = t0();
for _ in 0..8 {
r.entry_mut(h, half, now).bump(now);
}
let emissions = r.tick(&policy, now);
assert!(emissions.iter().any(|(k, _)| *k == h));
let emissions2 = r.tick(&policy, now);
assert!(
!emissions2.iter().any(|(k, _)| *k == h),
"concurrent tick in the pre-commit window must skip in-flight hashes; \
pre-fix this would re-emit and the sink would receive duplicates"
);
r.commit_emissions(&emissions);
for _ in 0..8 {
r.entry_mut(h, half, now).bump(now);
}
let emissions3 = r.tick(&policy, now);
assert!(
emissions3.iter().any(|(k, _)| *k == h),
"post-commit + further heat must re-enter emission",
);
}
#[test]
fn blob_heat_rollback_emission_lets_next_tick_reissue() {
let mut r = BlobHeatRegistry::new();
let policy = super::super::policy::DataGravityPolicy::default();
let half = policy.decay_half_life;
let h = hash(0x55);
let now = t0();
for _ in 0..8 {
r.entry_mut(h, half, now).bump(now);
}
let emissions = r.tick(&policy, now);
assert!(emissions.iter().any(|(k, _)| *k == h));
r.rollback_emission(&h);
let emissions2 = r.tick(&policy, now);
assert!(
emissions2.iter().any(|(k, _)| *k == h),
"rollback_emission must re-enable emission on the next tick",
);
}
#[test]
fn blob_heat_remove_clears_in_flight_so_reintroduced_hash_emits() {
let mut r = BlobHeatRegistry::new();
let policy = super::super::policy::DataGravityPolicy::default();
let half = policy.decay_half_life;
let h = hash(0x77);
let now = t0();
for _ in 0..8 {
r.entry_mut(h, half, now).bump(now);
}
let _ = r.tick(&policy, now);
r.remove(&h);
assert!(r.is_empty());
for _ in 0..8 {
r.entry_mut(h, half, now).bump(now);
}
let emissions = r.tick(&policy, now);
assert!(
emissions.iter().any(|(k, _)| *k == h),
"reintroduced hash must emit again; pre-fix in_flight leak suppressed it forever"
);
}
#[test]
fn blob_heat_evict_clears_in_flight() {
let mut r = BlobHeatRegistry::with_cap(2);
let policy = super::super::policy::DataGravityPolicy::default();
let half = policy.decay_half_life;
let t_a = t0();
let t_b = t_a + Duration::from_millis(10);
let t_c = t_a + Duration::from_millis(20);
let h_a = hash(0xA1);
let h_b = hash(0xB2);
let h_c = hash(0xC3);
for _ in 0..8 {
r.entry_mut(h_a, half, t_a).bump(t_a);
}
let _ = r.tick(&policy, t_a);
for _ in 0..8 {
r.entry_mut(h_b, half, t_b).bump(t_b);
}
for _ in 0..8 {
r.entry_mut(h_c, half, t_c).bump(t_c);
}
assert!(
r.get(&h_a).is_none(),
"h_a must have been evicted (LRU by last_update)"
);
let t_d = t_a + Duration::from_millis(30);
for _ in 0..8 {
r.entry_mut(h_a, half, t_d).bump(t_d);
}
let emissions = r.tick(&policy, t_d);
assert!(
emissions.iter().any(|(k, _)| *k == h_a),
"reintroduced-after-eviction hash must emit; pre-fix the eviction leak \
pinned the new counter as in-flight forever"
);
}
#[test]
fn blob_heat_remove_drops_entry() {
let mut r = BlobHeatRegistry::new();
let half = Duration::from_secs(60);
let h = hash(0x42);
r.entry_mut(h, half, t0()).bump(t0());
r.remove(&h);
assert!(r.is_empty());
}
#[test]
fn blob_heat_concurrent_bump_accumulates_under_outer_mutex() {
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
let registry = Arc::new(Mutex::new(BlobHeatRegistry::new()));
let half = Duration::from_secs(60 * 60); let target = hash(0xAB);
let threads = 8usize;
let per_thread = 1_000usize;
let start = Arc::new(Barrier::new(threads));
let mut handles = Vec::with_capacity(threads);
for _ in 0..threads {
let registry = registry.clone();
let start = start.clone();
handles.push(thread::spawn(move || {
start.wait();
for _ in 0..per_thread {
let now = Instant::now();
let mut guard = registry.lock().unwrap();
guard.entry_mut(target, half, now).bump(now);
}
}));
}
for h in handles {
h.join().expect("worker panicked");
}
let guard = registry.lock().unwrap();
let counter = guard.get(&target).expect("entry must exist");
let expected = (threads * per_thread) as f64;
let rate = counter.rate();
assert!(
rate > expected * 0.99,
"expected rate ≈ {} (8 × 1000 bumps); got {} (lower bound failed)",
expected,
rate,
);
assert!(
rate <= expected,
"expected rate ≤ {} (no double-counting); got {}",
expected,
rate,
);
}
#[test]
fn blob_heat_tick_concurrent_with_bumps_is_panic_free() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
let registry = Arc::new(Mutex::new(BlobHeatRegistry::new()));
let policy = super::super::policy::DataGravityPolicy::default();
let half = policy.decay_half_life;
let target = hash(0xCD);
let stop = Arc::new(AtomicBool::new(false));
let start = Arc::new(Barrier::new(2));
{
let now = Instant::now();
registry
.lock()
.unwrap()
.entry_mut(target, half, now)
.bump(now);
}
let bumper = {
let registry = registry.clone();
let stop = stop.clone();
let start = start.clone();
thread::spawn(move || {
start.wait();
while !stop.load(Ordering::Relaxed) {
let now = Instant::now();
let mut guard = registry.lock().unwrap();
guard.entry_mut(target, half, now).bump(now);
}
})
};
let ticker = {
let registry = registry.clone();
let stop = stop.clone();
let start = start.clone();
thread::spawn(move || {
start.wait();
for _ in 0..200 {
let now = Instant::now();
let mut guard = registry.lock().unwrap();
let _ = guard.tick(&policy, now);
}
stop.store(true, Ordering::Relaxed);
})
};
bumper.join().expect("bumper panicked");
ticker.join().expect("ticker panicked");
let guard = registry.lock().unwrap();
assert!(
guard.get(&target).is_some(),
"target entry should survive the storm"
);
}
#[test]
fn blob_heat_lru_cap_holds_under_concurrent_inserts() {
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
let cap = 16usize;
let registry = Arc::new(Mutex::new(BlobHeatRegistry::with_cap(cap)));
let half = Duration::from_secs(60);
let threads = 4usize;
let inserts_per_thread = 64u8;
let start = Arc::new(Barrier::new(threads));
let mut handles = Vec::with_capacity(threads);
for tid in 0..threads as u8 {
let registry = registry.clone();
let start = start.clone();
handles.push(thread::spawn(move || {
start.wait();
for i in 0..inserts_per_thread {
let k = hash(tid * inserts_per_thread + i);
let now = Instant::now();
let mut guard = registry.lock().unwrap();
guard.entry_mut(k, half, now);
}
}));
}
for h in handles {
h.join().expect("worker panicked");
}
let guard = registry.lock().unwrap();
assert!(
guard.len() <= cap,
"len() {} exceeded cap {} after concurrent inserts; LRU eviction is broken",
guard.len(),
cap,
);
}
}