use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
#[cfg(unix)]
use std::sync::atomic::{AtomicBool, Ordering};
use bytes::Bytes;
use irontide_core::Id20;
type PieceKey = (Id20, u32);
#[allow(dead_code)] const MAX_HOT_PIECES: usize = 16;
#[derive(Debug)]
enum CachedPiece {
Writing {
blocks: BTreeMap<u32, Bytes>,
total_bytes: usize,
piece_size: usize,
},
Skeleton {
flushed: HashSet<u32>,
total_bytes: usize,
piece_size: usize,
},
Cached { data: Bytes },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum WriteStatus {
Buffered,
PieceComplete,
WriteThrough,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct BufferPoolStats {
pub write_buffer_bytes: usize,
pub read_cache_bytes: usize,
pub total_entries: usize,
#[allow(dead_code)] pub cache_hits: u64,
#[allow(dead_code)] pub cache_misses: u64,
#[allow(dead_code)] pub read_bytes: u64,
#[allow(dead_code)] pub write_bytes: u64,
pub prefetch_count: u64,
pub eviction_count: u64,
pub skeleton_count: u64,
}
#[cfg(unix)]
static MLOCK_WARNED: AtomicBool = AtomicBool::new(false);
#[cfg(unix)]
fn mlock_bytes(data: &Bytes) {
if data.is_empty() {
return;
}
let ret = unsafe { libc::mlock(data.as_ptr().cast::<libc::c_void>(), data.len()) };
if ret != 0 {
let errno = std::io::Error::last_os_error();
if !MLOCK_WARNED.swap(true, Ordering::Relaxed) {
tracing::warn!("mlock failed (subsequent failures suppressed): {errno}");
}
}
}
#[cfg(unix)]
fn munlock_bytes(data: &Bytes) {
if data.is_empty() {
return;
}
unsafe {
libc::munlock(data.as_ptr().cast::<libc::c_void>(), data.len());
}
}
#[cfg(not(unix))]
fn mlock_bytes(_data: &Bytes) {}
#[cfg(not(unix))]
fn munlock_bytes(_data: &Bytes) {}
struct PieceArc {
capacity: usize,
p: usize,
t1: VecDeque<PieceKey>,
t2: VecDeque<PieceKey>,
b1: VecDeque<PieceKey>,
b2: VecDeque<PieceKey>,
sizes: HashMap<PieceKey, usize>,
t1_bytes: usize,
t2_bytes: usize,
b1_sizes: HashMap<PieceKey, usize>,
b2_sizes: HashMap<PieceKey, usize>,
}
impl PieceArc {
fn new(capacity: usize) -> Self {
Self {
capacity,
p: 0,
t1: VecDeque::new(),
t2: VecDeque::new(),
b1: VecDeque::new(),
b2: VecDeque::new(),
sizes: HashMap::new(),
t1_bytes: 0,
t2_bytes: 0,
b1_sizes: HashMap::new(),
b2_sizes: HashMap::new(),
}
}
fn used_bytes(&self) -> usize {
self.t1_bytes.saturating_add(self.t2_bytes)
}
fn insert(&mut self, key: PieceKey, size: usize) -> Vec<PieceKey> {
let mut evicted = Vec::new();
if self.sizes.contains_key(&key) {
self.promote_to_t2(&key);
if let Some(old_size) = self.sizes.insert(key, size) {
self.t2_bytes = self.t2_bytes.saturating_sub(old_size).saturating_add(size);
}
return evicted;
}
if let Some(pos) = self.b1.iter().position(|k| *k == key) {
let ghost_size = self.b1_sizes.remove(&key).unwrap_or(1);
let b2_total: usize = self.b2_sizes.values().sum();
let b1_total: usize = self
.b1_sizes
.values()
.sum::<usize>()
.saturating_add(ghost_size);
let delta = ghost_size.max(b2_total.checked_div(b1_total.max(1)).unwrap_or(0).max(1));
self.p = self.p.saturating_add(delta).min(self.capacity);
self.b1.remove(pos);
evicted.extend(self.replace(false));
self.t2.push_back(key);
self.sizes.insert(key, size);
self.t2_bytes = self.t2_bytes.saturating_add(size);
self.evict_overflow(&mut evicted);
self.cap_ghost_lists();
return evicted;
}
if let Some(pos) = self.b2.iter().position(|k| *k == key) {
let ghost_size = self.b2_sizes.remove(&key).unwrap_or(1);
let b1_total: usize = self.b1_sizes.values().sum();
let b2_total: usize = self
.b2_sizes
.values()
.sum::<usize>()
.saturating_add(ghost_size);
let delta = ghost_size.max(b1_total.checked_div(b2_total.max(1)).unwrap_or(0).max(1));
self.p = self.p.saturating_sub(delta);
self.b2.remove(pos);
evicted.extend(self.replace(true));
self.t2.push_back(key);
self.sizes.insert(key, size);
self.t2_bytes = self.t2_bytes.saturating_add(size);
self.evict_overflow(&mut evicted);
self.cap_ghost_lists();
return evicted;
}
evicted.extend(self.replace(false));
self.t1.push_back(key);
self.sizes.insert(key, size);
self.t1_bytes = self.t1_bytes.saturating_add(size);
self.evict_overflow(&mut evicted);
self.cap_ghost_lists();
evicted
}
fn access(&mut self, key: &PieceKey) -> bool {
if !self.sizes.contains_key(key) {
return false;
}
self.promote_to_t2(key);
true
}
fn remove(&mut self, key: &PieceKey) {
if let Some(size) = self.sizes.remove(key) {
if let Some(pos) = self.t1.iter().position(|k| k == key) {
self.t1.remove(pos);
self.t1_bytes = self.t1_bytes.saturating_sub(size);
} else if let Some(pos) = self.t2.iter().position(|k| k == key) {
self.t2.remove(pos);
self.t2_bytes = self.t2_bytes.saturating_sub(size);
}
}
if let Some(pos) = self.b1.iter().position(|k| k == key) {
self.b1.remove(pos);
self.b1_sizes.remove(key);
}
if let Some(pos) = self.b2.iter().position(|k| k == key) {
self.b2.remove(pos);
self.b2_sizes.remove(key);
}
}
#[allow(dead_code)] fn t2_keys(&self) -> impl Iterator<Item = &PieceKey> {
self.t2.iter()
}
fn promote_to_t2(&mut self, key: &PieceKey) {
let size = self.sizes.get(key).copied().unwrap_or(0);
if let Some(pos) = self.t1.iter().position(|k| k == key) {
self.t1.remove(pos);
self.t1_bytes = self.t1_bytes.saturating_sub(size);
self.t2.push_back(*key);
self.t2_bytes = self.t2_bytes.saturating_add(size);
} else if let Some(pos) = self.t2.iter().position(|k| k == key) {
self.t2.remove(pos);
self.t2.push_back(*key);
}
}
fn replace(&mut self, b2_hit: bool) -> Option<PieceKey> {
if self.used_bytes() < self.capacity {
return None;
}
if !self.t1.is_empty() && (self.t1_bytes > self.p || (b2_hit && self.t1_bytes == self.p)) {
self.evict_t1_lru()
} else {
self.evict_t2_lru()
}
}
fn evict_t1_lru(&mut self) -> Option<PieceKey> {
let key = self.t1.pop_front()?;
let size = self.sizes.remove(&key).unwrap_or(0);
self.t1_bytes = self.t1_bytes.saturating_sub(size);
self.b1.push_back(key);
self.b1_sizes.insert(key, size);
Some(key)
}
fn evict_t2_lru(&mut self) -> Option<PieceKey> {
let key = self.t2.pop_front()?;
let size = self.sizes.remove(&key).unwrap_or(0);
self.t2_bytes = self.t2_bytes.saturating_sub(size);
self.b2.push_back(key);
self.b2_sizes.insert(key, size);
Some(key)
}
fn evict_overflow(&mut self, evicted: &mut Vec<PieceKey>) {
while self.used_bytes() > self.capacity {
let victim = if !self.t1.is_empty() && self.t1_bytes > self.p {
self.evict_t1_lru()
} else if !self.t2.is_empty() {
self.evict_t2_lru()
} else {
self.evict_t1_lru()
};
match victim {
Some(k) => evicted.push(k),
None => break,
}
}
}
fn cap_ghost_lists(&mut self) {
let ghost_budget = self.capacity;
while self.ghost_bytes_b1() > ghost_budget {
if let Some(k) = self.b1.pop_front() {
self.b1_sizes.remove(&k);
} else {
break;
}
}
while self.ghost_bytes_b2() > ghost_budget {
if let Some(k) = self.b2.pop_front() {
self.b2_sizes.remove(&k);
} else {
break;
}
}
}
fn ghost_bytes_b1(&self) -> usize {
self.b1_sizes.values().sum()
}
fn ghost_bytes_b2(&self) -> usize {
self.b2_sizes.values().sum()
}
}
pub(crate) struct BufferPool {
entries: HashMap<PieceKey, CachedPiece>,
arc: PieceArc,
total_capacity: usize,
write_bytes: usize,
enable_mlock: bool,
write_order: VecDeque<PieceKey>,
cache_hits: u64,
cache_misses: u64,
read_bytes_stat: u64,
write_bytes_stat: u64,
prefetch_count: u64,
eviction_count: u64,
skeleton_count: u64,
}
impl BufferPool {
pub fn with_capacity(total_capacity: usize) -> Self {
Self {
entries: HashMap::new(),
arc: PieceArc::new(total_capacity),
total_capacity,
write_bytes: 0,
enable_mlock: cfg!(unix),
write_order: VecDeque::new(),
cache_hits: 0,
cache_misses: 0,
read_bytes_stat: 0,
write_bytes_stat: 0,
prefetch_count: 0,
eviction_count: 0,
skeleton_count: 0,
}
}
pub fn set_mlock(&mut self, enable: bool) {
self.enable_mlock = enable;
}
pub fn write_block(
&mut self,
key: PieceKey,
begin: u32,
data: Bytes,
piece_size: usize,
) -> WriteStatus {
let data_len = data.len();
self.write_bytes_stat = self.write_bytes_stat.saturating_add(data_len as u64);
match self.entries.get_mut(&key) {
Some(CachedPiece::Writing {
blocks,
total_bytes,
piece_size: ps,
}) => {
if let Some(old) = blocks.insert(begin, data) {
*total_bytes = total_bytes.saturating_sub(old.len());
self.write_bytes = self.write_bytes.saturating_sub(old.len());
}
*total_bytes = total_bytes.saturating_add(data_len);
self.write_bytes = self.write_bytes.saturating_add(data_len);
let complete = *total_bytes >= *ps;
self.maybe_evict();
if complete {
WriteStatus::PieceComplete
} else {
WriteStatus::Buffered
}
}
Some(CachedPiece::Skeleton {
flushed,
total_bytes,
piece_size: ps,
}) => {
flushed.insert(begin);
*total_bytes = total_bytes.saturating_add(data_len);
if *total_bytes >= *ps {
WriteStatus::PieceComplete
} else {
WriteStatus::WriteThrough
}
}
Some(CachedPiece::Cached { .. }) => {
WriteStatus::Buffered
}
None => {
let mut blocks = BTreeMap::new();
blocks.insert(begin, data);
self.entries.insert(
key,
CachedPiece::Writing {
blocks,
total_bytes: data_len,
piece_size,
},
);
self.write_bytes = self.write_bytes.saturating_add(data_len);
self.write_order.push_back(key);
let complete = data_len >= piece_size;
self.maybe_evict();
if complete {
WriteStatus::PieceComplete
} else {
WriteStatus::Buffered
}
}
}
}
#[cfg(test)]
pub fn take_completed_data(&mut self, key: PieceKey) -> Option<Vec<u8>> {
match self.entries.get(&key) {
Some(CachedPiece::Writing {
total_bytes,
piece_size,
..
}) if *total_bytes >= *piece_size => {}
_ => return None,
}
let entry = self.entries.remove(&key)?;
self.remove_from_write_order(&key);
if let CachedPiece::Writing {
blocks,
total_bytes,
..
} = entry
{
self.write_bytes = self.write_bytes.saturating_sub(total_bytes);
let mut assembled = Vec::with_capacity(total_bytes);
for (_, block) in blocks {
assembled.extend_from_slice(&block);
}
Some(assembled)
} else {
None
}
}
pub fn take_all_blocks(&mut self, key: PieceKey) -> Option<Vec<u8>> {
if !matches!(self.entries.get(&key), Some(CachedPiece::Writing { .. })) {
return None;
}
let entry = self.entries.remove(&key)?;
self.remove_from_write_order(&key);
if let CachedPiece::Writing {
blocks,
total_bytes,
..
} = entry
{
self.write_bytes = self.write_bytes.saturating_sub(total_bytes);
let size = blocks
.iter()
.map(|(begin, data)| (*begin as usize).saturating_add(data.len()))
.max()
.unwrap_or(0);
let mut assembled = vec![0u8; size];
for (begin, data) in &blocks {
let start = *begin as usize;
let end = start.saturating_add(data.len());
if end <= assembled.len() {
assembled[start..end].copy_from_slice(data);
}
}
Some(assembled)
} else {
None
}
}
pub fn promote_to_cached(&mut self, key: PieceKey, data: Bytes) {
let size = data.len();
self.remove_entry_internal(&key);
let evicted = self.arc.insert(key, size);
self.handle_evictions(&evicted);
if self.enable_mlock {
mlock_bytes(&data);
}
self.entries.insert(key, CachedPiece::Cached { data });
}
pub fn read_block(&mut self, key: PieceKey, begin: u32, length: usize) -> Option<Bytes> {
match self.entries.get(&key) {
Some(CachedPiece::Writing { blocks, .. }) => {
let block = blocks.get(&begin)?;
if block.len() >= length {
self.cache_hits = self.cache_hits.saturating_add(1);
self.read_bytes_stat = self.read_bytes_stat.saturating_add(length as u64);
Some(block.slice(..length))
} else {
self.cache_misses = self.cache_misses.saturating_add(1);
None
}
}
Some(CachedPiece::Cached { data }) => {
let end = begin as usize + length;
if end > data.len() {
self.cache_misses = self.cache_misses.saturating_add(1);
return None;
}
self.cache_hits = self.cache_hits.saturating_add(1);
self.read_bytes_stat = self.read_bytes_stat.saturating_add(length as u64);
let slice = data.slice(begin as usize..end);
self.arc.access(&key);
Some(slice)
}
Some(CachedPiece::Skeleton { .. }) | None => {
self.cache_misses = self.cache_misses.saturating_add(1);
None
}
}
}
pub fn prefetch_piece(&mut self, key: PieceKey, data: Bytes) {
self.prefetch_count = self.prefetch_count.saturating_add(1);
self.promote_to_cached(key, data);
}
pub fn clear_piece(&mut self, key: PieceKey) {
self.remove_entry_internal(&key);
}
pub fn clear_torrent(&mut self, info_hash: Id20) {
let keys: Vec<PieceKey> = self
.entries
.keys()
.filter(|(ih, _)| *ih == info_hash)
.copied()
.collect();
for key in keys {
self.remove_entry_internal(&key);
}
}
pub fn hot_pieces(&self, info_hash: Id20) -> Vec<u32> {
self.arc
.t2_keys()
.filter(|(ih, _)| *ih == info_hash)
.map(|(_, idx)| *idx)
.take(MAX_HOT_PIECES)
.collect()
}
#[allow(dead_code)] pub fn cached_pieces(&self, info_hash: Id20) -> Vec<u32> {
self.entries
.iter()
.filter_map(|((ih, idx), entry)| {
if *ih == info_hash && matches!(entry, CachedPiece::Cached { .. }) {
Some(*idx)
} else {
None
}
})
.collect()
}
pub fn flush_piece(&mut self, key: PieceKey) -> Option<Vec<(u32, Bytes)>> {
match self.entries.get(&key) {
Some(CachedPiece::Writing { .. }) => {}
_ => return None,
}
let entry = self.entries.remove(&key)?;
self.remove_from_write_order(&key);
if let CachedPiece::Writing {
blocks,
total_bytes,
..
} = entry
{
self.write_bytes = self.write_bytes.saturating_sub(total_bytes);
Some(blocks.into_iter().collect())
} else {
None
}
}
pub fn flush_all(&mut self) -> Vec<(PieceKey, Vec<(u32, Bytes)>)> {
let writing_keys: Vec<PieceKey> = self
.entries
.iter()
.filter_map(|(k, v)| {
if matches!(v, CachedPiece::Writing { .. }) {
Some(*k)
} else {
None
}
})
.collect();
let mut result = Vec::with_capacity(writing_keys.len());
for key in writing_keys {
if let Some(blocks) = self.flush_piece(key) {
result.push((key, blocks));
}
}
result
}
pub fn stats(&self) -> BufferPoolStats {
BufferPoolStats {
write_buffer_bytes: self.write_bytes,
read_cache_bytes: self.arc.used_bytes(),
total_entries: self.entries.len(),
cache_hits: self.cache_hits,
cache_misses: self.cache_misses,
read_bytes: self.read_bytes_stat,
write_bytes: self.write_bytes_stat,
prefetch_count: self.prefetch_count,
eviction_count: self.eviction_count,
skeleton_count: self.skeleton_count,
}
}
fn total_used(&self) -> usize {
self.write_bytes.saturating_add(self.arc.used_bytes())
}
fn maybe_evict(&mut self) {
while self.total_used() > self.total_capacity && self.arc.used_bytes() > 0 {
let victim = if !self.arc.t1.is_empty() && self.arc.t1_bytes > self.arc.p {
self.arc.evict_t1_lru()
} else if !self.arc.t2.is_empty() {
self.arc.evict_t2_lru()
} else {
self.arc.evict_t1_lru()
};
match victim {
Some(key) => {
if let Some(CachedPiece::Cached { data }) = self.entries.remove(&key)
&& self.enable_mlock
{
munlock_bytes(&data);
}
self.eviction_count = self.eviction_count.saturating_add(1);
}
None => break,
}
}
while self.total_used() > self.total_capacity {
let oldest = self.write_order.pop_front();
match oldest {
Some(key) => {
if let Some(CachedPiece::Writing {
blocks,
total_bytes,
piece_size,
}) = self.entries.remove(&key)
{
let flushed: HashSet<u32> = blocks.keys().copied().collect();
self.write_bytes = self.write_bytes.saturating_sub(total_bytes);
self.skeleton_count = self.skeleton_count.saturating_add(1);
self.entries.insert(
key,
CachedPiece::Skeleton {
flushed,
total_bytes,
piece_size,
},
);
}
}
None => break,
}
}
}
fn remove_entry_internal(&mut self, key: &PieceKey) {
if let Some(entry) = self.entries.remove(key) {
match entry {
CachedPiece::Writing { total_bytes, .. } => {
self.write_bytes = self.write_bytes.saturating_sub(total_bytes);
self.remove_from_write_order(key);
}
CachedPiece::Cached { ref data } => {
if self.enable_mlock {
munlock_bytes(data);
}
self.arc.remove(key);
}
CachedPiece::Skeleton { .. } => {
}
}
}
}
fn remove_from_write_order(&mut self, key: &PieceKey) {
if let Some(pos) = self.write_order.iter().position(|k| k == key) {
self.write_order.remove(pos);
}
}
fn handle_evictions(&mut self, evicted: &[PieceKey]) {
for key in evicted {
if let Some(CachedPiece::Cached { data }) = self.entries.remove(key)
&& self.enable_mlock
{
munlock_bytes(&data);
}
self.eviction_count = self.eviction_count.saturating_add(1);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn hash(n: u8) -> Id20 {
let mut b = [0u8; 20];
b[0] = n;
Id20(b)
}
fn data(len: usize) -> Bytes {
Bytes::from(vec![0xAB_u8; len])
}
fn data_fill(len: usize, fill: u8) -> Bytes {
Bytes::from(vec![fill; len])
}
#[test]
fn write_chunk_buffers_until_complete() {
let mut pool = BufferPool::with_capacity(1024 * 1024);
pool.set_mlock(false);
let key = (hash(1), 0);
let piece_size = 300;
let s1 = pool.write_block(key, 0, data(100), piece_size);
assert_eq!(s1, WriteStatus::Buffered);
let s2 = pool.write_block(key, 100, data(100), piece_size);
assert_eq!(s2, WriteStatus::Buffered);
let s3 = pool.write_block(key, 200, data(100), piece_size);
assert_eq!(s3, WriteStatus::PieceComplete);
}
#[test]
fn write_chunk_accepts_bytes_zero_copy() {
let mut pool = BufferPool::with_capacity(1024 * 1024);
pool.set_mlock(false);
let key = (hash(1), 0);
let original = Bytes::from(vec![42u8; 100]);
let clone = original.clone();
pool.write_block(key, 0, clone, 200);
let attempt = original;
assert!(attempt.try_into_mut().is_err());
}
#[test]
fn read_chunk_hits_write_cache() {
let mut pool = BufferPool::with_capacity(1024 * 1024);
pool.set_mlock(false);
let key = (hash(1), 0);
let block = data_fill(100, 0xCC);
pool.write_block(key, 0, block, 200);
let read = pool.read_block(key, 0, 100);
assert!(read.is_some());
assert_eq!(read.as_ref().unwrap().len(), 100);
assert_eq!(pool.cache_hits, 1);
}
#[test]
fn read_chunk_miss_prefetches_full_piece() {
let mut pool = BufferPool::with_capacity(1024 * 1024);
pool.set_mlock(false);
let key = (hash(1), 0);
let miss = pool.read_block(key, 0, 100);
assert!(miss.is_none());
assert_eq!(pool.cache_misses, 1);
pool.prefetch_piece(key, data_fill(256, 0xDD));
let hit = pool.read_block(key, 0, 100);
assert!(hit.is_some());
assert_eq!(&hit.unwrap()[..], &[0xDD; 100]);
assert_eq!(pool.cache_hits, 1);
assert_eq!(pool.prefetch_count, 1);
}
#[test]
fn read_cache_arc_eviction() {
let mut pool = BufferPool::with_capacity(200);
pool.set_mlock(false);
let k1 = (hash(1), 0);
let k2 = (hash(1), 1);
let k3 = (hash(1), 2);
pool.promote_to_cached(k1, data(100));
pool.promote_to_cached(k2, data(100));
pool.promote_to_cached(k3, data(100));
assert!(pool.read_block(k1, 0, 100).is_none());
assert!(pool.read_block(k2, 0, 100).is_some());
assert!(pool.read_block(k3, 0, 100).is_some());
assert!(pool.eviction_count >= 1);
}
#[test]
fn back_pressure_evicts_cached_first() {
let mut pool = BufferPool::with_capacity(300);
pool.set_mlock(false);
let wk = (hash(1), 0);
let ck1 = (hash(1), 1);
let ck2 = (hash(1), 2);
pool.write_block(wk, 0, data(100), 200);
pool.promote_to_cached(ck1, data(100));
pool.promote_to_cached(ck2, data(100));
pool.write_block(wk, 100, data(100), 200);
assert!(pool.read_block(wk, 0, 100).is_some());
let cached_remaining = [ck1, ck2]
.iter()
.filter(|k| {
pool.entries.contains_key(k)
&& matches!(pool.entries[k], CachedPiece::Cached { .. })
})
.count();
assert!(
cached_remaining < 2,
"expected at least one cached eviction"
);
}
#[test]
fn back_pressure_creates_skeleton() {
let mut pool = BufferPool::with_capacity(100);
pool.set_mlock(false);
let k1 = (hash(1), 0);
let k2 = (hash(1), 1);
pool.write_block(k1, 0, data(80), 200);
pool.write_block(k2, 0, data(80), 200);
let skeleton_count = pool
.entries
.values()
.filter(|v| matches!(v, CachedPiece::Skeleton { .. }))
.count();
assert!(skeleton_count >= 1, "expected at least one Skeleton entry");
assert!(pool.skeleton_count >= 1);
}
#[test]
fn skeleton_entry_completes() {
let mut pool = BufferPool::with_capacity(1024 * 1024);
pool.set_mlock(false);
let key = (hash(1), 0);
let piece_size = 200;
let mut flushed = HashSet::new();
flushed.insert(0);
pool.entries.insert(
key,
CachedPiece::Skeleton {
flushed,
total_bytes: 100,
piece_size,
},
);
let status = pool.write_block(key, 100, data(100), piece_size);
assert_eq!(status, WriteStatus::PieceComplete);
match pool.entries.get(&key) {
Some(CachedPiece::Skeleton {
total_bytes,
piece_size: ps,
..
}) => {
assert_eq!(*total_bytes, 200);
assert_eq!(*ps, piece_size);
}
other => panic!("expected Skeleton, got {other:?}"),
}
}
#[test]
fn clear_piece_removes_any_state() {
let mut pool = BufferPool::with_capacity(1024 * 1024);
pool.set_mlock(false);
let k_writing = (hash(1), 0);
let k_cached = (hash(1), 1);
let k_skeleton = (hash(1), 2);
pool.write_block(k_writing, 0, data(100), 200);
pool.promote_to_cached(k_cached, data(100));
pool.entries.insert(
k_skeleton,
CachedPiece::Skeleton {
flushed: HashSet::new(),
total_bytes: 0,
piece_size: 200,
},
);
pool.clear_piece(k_writing);
pool.clear_piece(k_cached);
pool.clear_piece(k_skeleton);
assert!(pool.entries.is_empty());
assert_eq!(pool.write_bytes, 0);
assert_eq!(pool.arc.used_bytes(), 0);
}
#[test]
fn hash_from_cache_pass_promotes() {
let mut pool = BufferPool::with_capacity(1024 * 1024);
pool.set_mlock(false);
let key = (hash(1), 0);
let piece_size = 200;
pool.write_block(key, 0, data(100), piece_size);
pool.write_block(key, 100, data(100), piece_size);
let assembled = pool.take_completed_data(key).expect("should be complete");
assert_eq!(assembled.len(), piece_size);
assert!(!pool.entries.contains_key(&key));
pool.promote_to_cached(key, Bytes::from(assembled));
assert!(matches!(
pool.entries.get(&key),
Some(CachedPiece::Cached { .. })
));
assert!(pool.read_block(key, 0, 100).is_some());
}
#[test]
fn hash_from_cache_fail_discards() {
let mut pool = BufferPool::with_capacity(1024 * 1024);
pool.set_mlock(false);
let key = (hash(1), 0);
let piece_size = 200;
pool.write_block(key, 0, data(100), piece_size);
pool.write_block(key, 100, data(100), piece_size);
let assembled = pool.take_completed_data(key);
assert!(assembled.is_some());
pool.clear_piece(key);
assert!(!pool.entries.contains_key(&key));
assert!(pool.read_block(key, 0, 100).is_none());
}
#[test]
fn read_from_skeleton_falls_to_disk() {
let mut pool = BufferPool::with_capacity(1024 * 1024);
pool.set_mlock(false);
let key = (hash(1), 0);
pool.entries.insert(
key,
CachedPiece::Skeleton {
flushed: HashSet::new(),
total_bytes: 0,
piece_size: 200,
},
);
let read = pool.read_block(key, 0, 100);
assert!(read.is_none());
assert_eq!(pool.cache_misses, 1);
}
#[test]
fn hot_pieces_returns_t2_entries() {
let mut pool = BufferPool::with_capacity(1024 * 1024);
pool.set_mlock(false);
let ih = hash(1);
for i in 0..5_u32 {
let key = (ih, i);
pool.promote_to_cached(key, data(100));
pool.read_block(key, 0, 50);
}
let hot = pool.hot_pieces(ih);
assert_eq!(hot.len(), 5);
for i in 0..5_u32 {
assert!(hot.contains(&i), "expected piece {i} in hot list");
}
}
#[test]
fn hot_pieces_caps_at_16() {
let mut pool = BufferPool::with_capacity(1024 * 1024);
pool.set_mlock(false);
let ih = hash(1);
for i in 0..30_u32 {
let key = (ih, i);
pool.promote_to_cached(key, data(100));
pool.read_block(key, 0, 50);
}
let hot = pool.hot_pieces(ih);
assert_eq!(hot.len(), MAX_HOT_PIECES);
}
#[test]
fn volatile_read_bypasses_cache() {
let mut pool = BufferPool::with_capacity(1024 * 1024);
pool.set_mlock(false);
let key = (hash(99), 42);
let read = pool.read_block(key, 0, 100);
assert!(read.is_none());
assert_eq!(pool.cache_misses, 1);
}
#[cfg(unix)]
#[test]
fn mlock_called_on_insert_and_evict() {
use std::sync::atomic::AtomicUsize;
static MLOCK_CALLS: AtomicUsize = AtomicUsize::new(0);
static MUNLOCK_CALLS: AtomicUsize = AtomicUsize::new(0);
MLOCK_CALLS.store(0, Ordering::SeqCst);
MUNLOCK_CALLS.store(0, Ordering::SeqCst);
let mut pool = BufferPool::with_capacity(200);
pool.set_mlock(true);
let k1 = (hash(1), 0);
let k2 = (hash(1), 1);
let k3 = (hash(1), 2);
pool.promote_to_cached(k1, data(100));
pool.promote_to_cached(k2, data(100));
pool.promote_to_cached(k3, data(100));
assert!(pool.eviction_count >= 1, "expected at least one eviction");
assert_eq!(pool.entries.len(), 2);
}
#[test]
fn eviction_flush_error_reverts_to_writing() {
let mut pool = BufferPool::with_capacity(1024 * 1024);
pool.set_mlock(false);
let key = (hash(1), 0);
let piece_size = 300;
let mut flushed = HashSet::new();
flushed.insert(0);
flushed.insert(100);
pool.entries.insert(
key,
CachedPiece::Skeleton {
flushed,
total_bytes: 200,
piece_size,
},
);
let status = pool.write_block(key, 200, data(100), piece_size);
assert_eq!(status, WriteStatus::PieceComplete);
}
#[test]
fn arc_ghost_list_adaptation() {
let mut arc = PieceArc::new(200);
let ih = hash(1);
let k1 = (ih, 0);
let k2 = (ih, 1);
arc.insert(k1, 100);
arc.insert(k2, 100);
let k3 = (ih, 2);
let evicted = arc.insert(k3, 100);
assert!(evicted.contains(&k1) || !arc.sizes.contains_key(&k1));
let p_before = arc.p;
arc.insert(k1, 100);
assert!(
arc.p >= p_before,
"B1 hit should increase p: was {p_before}, now {}",
arc.p
);
arc.access(&k2);
let k4 = (ih, 3);
let k5 = (ih, 4);
arc.insert(k4, 100);
arc.insert(k5, 100);
let p_before_b2 = arc.p;
let b2_key = arc.b2.front().copied();
if let Some(bk) = b2_key {
arc.insert(bk, 100);
assert!(
arc.p <= p_before_b2,
"B2 hit should decrease p: was {p_before_b2}, now {}",
arc.p
);
}
assert!(
arc.ghost_bytes_b1() <= arc.capacity,
"B1 ghost bytes {} exceeds capacity {}",
arc.ghost_bytes_b1(),
arc.capacity
);
assert!(
arc.ghost_bytes_b2() <= arc.capacity,
"B2 ghost bytes {} exceeds capacity {}",
arc.ghost_bytes_b2(),
arc.capacity
);
}
}