rbit 0.2.2

A BitTorrent library implementing BEP specifications
Documentation
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use bytes::Bytes;
use parking_lot::RwLock;

type CacheKey = (String, u32);

struct CachedPiece {
    data: Bytes,
    #[allow(dead_code)]
    verified: bool,
}

pub struct PieceCache {
    t1: RwLock<LruList>,
    t2: RwLock<LruList>,
    b1: RwLock<GhostList>,
    b2: RwLock<GhostList>,
    p: AtomicUsize,
    capacity: usize,
    memory_used: AtomicUsize,
}

struct LruList {
    order: VecDeque<CacheKey>,
    data: HashMap<CacheKey, CachedPiece>,
}

impl LruList {
    fn new() -> Self {
        Self {
            order: VecDeque::new(),
            data: HashMap::new(),
        }
    }

    fn len(&self) -> usize {
        self.data.len()
    }

    fn contains(&self, key: &CacheKey) -> bool {
        self.data.contains_key(key)
    }

    fn get(&self, key: &CacheKey) -> Option<&CachedPiece> {
        self.data.get(key)
    }

    fn insert(&mut self, key: CacheKey, piece: CachedPiece) -> (usize, usize) {
        let new_size = piece.data.len();
        let old_size = if let Some(old_piece) = self.data.insert(key.clone(), piece) {
            old_piece.data.len()
        } else {
            self.order.push_back(key);
            0
        };
        (new_size, old_size)
    }

    fn remove(&mut self, key: &CacheKey) -> Option<(CachedPiece, usize)> {
        if let Some(piece) = self.data.remove(key) {
            self.order.retain(|k| k != key);
            let size = piece.data.len();
            Some((piece, size))
        } else {
            None
        }
    }

    fn pop_front(&mut self) -> Option<(CacheKey, CachedPiece, usize)> {
        while let Some(key) = self.order.pop_front() {
            if let Some(piece) = self.data.remove(&key) {
                let size = piece.data.len();
                return Some((key, piece, size));
            }
        }
        None
    }

    fn move_to_back(&mut self, key: &CacheKey) {
        if self.data.contains_key(key) {
            self.order.retain(|k| k != key);
            self.order.push_back(key.clone());
        }
    }
}

struct GhostList {
    keys: VecDeque<CacheKey>,
    set: HashSet<CacheKey>,
}

impl GhostList {
    fn new() -> Self {
        Self {
            keys: VecDeque::new(),
            set: HashSet::new(),
        }
    }

    fn len(&self) -> usize {
        self.set.len()
    }

    fn contains(&self, key: &CacheKey) -> bool {
        self.set.contains(key)
    }

    fn insert(&mut self, key: CacheKey) {
        if self.set.insert(key.clone()) {
            self.keys.push_back(key);
        }
    }

    fn remove(&mut self, key: &CacheKey) -> bool {
        if self.set.remove(key) {
            self.keys.retain(|k| k != key);
            true
        } else {
            false
        }
    }

    fn pop_front(&mut self) -> Option<CacheKey> {
        while let Some(key) = self.keys.pop_front() {
            if self.set.remove(&key) {
                return Some(key);
            }
        }
        None
    }
}

impl PieceCache {
    pub fn new(capacity: usize) -> Arc<Self> {
        Arc::new(Self {
            t1: RwLock::new(LruList::new()),
            t2: RwLock::new(LruList::new()),
            b1: RwLock::new(GhostList::new()),
            b2: RwLock::new(GhostList::new()),
            p: AtomicUsize::new(0),
            capacity,
            memory_used: AtomicUsize::new(0),
        })
    }

    pub fn get(&self, info_hash: &str, piece_index: u32) -> Option<Bytes> {
        let key = (info_hash.to_string(), piece_index);

        {
            let t1 = self.t1.read();
            if let Some(piece) = t1.get(&key) {
                let data = piece.data.clone();
                drop(t1);
                self.promote_t1_to_t2(&key);
                return Some(data);
            }
        }

        {
            let t2 = self.t2.read();
            if let Some(piece) = t2.get(&key) {
                let data = piece.data.clone();
                drop(t2);
                self.t2.write().move_to_back(&key);
                return Some(data);
            }
        }

        None
    }

    fn promote_t1_to_t2(&self, key: &CacheKey) {
        let mut t1 = self.t1.write();
        if let Some((piece, _)) = t1.remove(key) {
            drop(t1);
            let mut t2 = self.t2.write();
            let _ = t2.insert(key.clone(), piece);
        }
    }

    pub fn insert(&self, info_hash: &str, piece_index: u32, data: Bytes, verified: bool) {
        let key = (info_hash.to_string(), piece_index);
        let piece = CachedPiece { data, verified };

        if self.b1.read().contains(&key) {
            self.b1.write().remove(&key);
            let delta = self.compute_delta_b1();
            self.adapt_p(delta as isize);
            self.replace(&key, false);
            let (new_size, old_size) = self.t2.write().insert(key, piece);
            self.update_memory(new_size, old_size);
            return;
        }

        if self.b2.read().contains(&key) {
            self.b2.write().remove(&key);
            let delta = self.compute_delta_b2();
            self.adapt_p(-(delta as isize));
            self.replace(&key, true);
            let (new_size, old_size) = self.t2.write().insert(key, piece);
            self.update_memory(new_size, old_size);
            return;
        }

        let t1_len = self.t1.read().len();
        let b1_len = self.b1.read().len();
        if t1_len + b1_len >= self.capacity {
            if t1_len < self.capacity {
                self.b1.write().pop_front();
                self.replace(&key, false);
            } else if let Some((_, _, size)) = self.t1.write().pop_front() {
                self.memory_used.fetch_sub(size, Ordering::Relaxed);
            }
        } else {
            let total = t1_len + b1_len + self.t2.read().len() + self.b2.read().len();
            if total >= self.capacity {
                if total >= 2 * self.capacity {
                    self.b2.write().pop_front();
                }
                self.replace(&key, false);
            }
        }

        let (new_size, old_size) = self.t1.write().insert(key, piece);
        self.update_memory(new_size, old_size);
    }

    fn update_memory(&self, new_size: usize, old_size: usize) {
        if new_size > old_size {
            self.memory_used
                .fetch_add(new_size - old_size, Ordering::Relaxed);
        } else {
            self.memory_used
                .fetch_sub(old_size - new_size, Ordering::Relaxed);
        }
    }

    fn replace(&self, _key: &CacheKey, in_b2: bool) {
        let t1_len = self.t1.read().len();
        let p = self.p.load(Ordering::Relaxed);

        let evict_from_t1 = t1_len > 0
            && ((in_b2 && t1_len == p) || (!in_b2 && t1_len > p) || self.t2.read().len() == 0);

        if evict_from_t1 {
            if let Some((evicted_key, _, size)) = self.t1.write().pop_front() {
                self.memory_used.fetch_sub(size, Ordering::Relaxed);
                self.b1.write().insert(evicted_key);
            }
        } else if let Some((evicted_key, _, size)) = self.t2.write().pop_front() {
            self.memory_used.fetch_sub(size, Ordering::Relaxed);
            self.b2.write().insert(evicted_key);
        }
    }

    fn compute_delta_b1(&self) -> usize {
        let b1_len = self.b1.read().len();
        let b2_len = self.b2.read().len();
        if b1_len >= b2_len {
            1
        } else {
            b2_len / b1_len.max(1)
        }
    }

    fn compute_delta_b2(&self) -> usize {
        let b1_len = self.b1.read().len();
        let b2_len = self.b2.read().len();
        if b2_len >= b1_len {
            1
        } else {
            b1_len / b2_len.max(1)
        }
    }

    fn adapt_p(&self, delta: isize) {
        let current = self.p.load(Ordering::Relaxed) as isize;
        let new_p = (current + delta).max(0).min(self.capacity as isize) as usize;
        self.p.store(new_p, Ordering::Relaxed);
    }

    pub fn remove(&self, info_hash: &str, piece_index: u32) -> Option<Bytes> {
        let key = (info_hash.to_string(), piece_index);

        if let Some((piece, size)) = self.t1.write().remove(&key) {
            self.memory_used.fetch_sub(size, Ordering::Relaxed);
            return Some(piece.data);
        }

        if let Some((piece, size)) = self.t2.write().remove(&key) {
            self.memory_used.fetch_sub(size, Ordering::Relaxed);
            return Some(piece.data);
        }

        None
    }

    pub fn contains(&self, info_hash: &str, piece_index: u32) -> bool {
        let key = (info_hash.to_string(), piece_index);
        self.t1.read().contains(&key) || self.t2.read().contains(&key)
    }

    pub fn memory_used(&self) -> usize {
        self.memory_used.load(Ordering::Relaxed)
    }

    pub fn capacity(&self) -> usize {
        self.capacity
    }

    pub fn len(&self) -> usize {
        self.t1.read().len() + self.t2.read().len()
    }

    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    pub fn clear(&self) {
        self.t1.write().data.clear();
        self.t1.write().order.clear();
        self.t2.write().data.clear();
        self.t2.write().order.clear();
        self.b1.write().keys.clear();
        self.b1.write().set.clear();
        self.b2.write().keys.clear();
        self.b2.write().set.clear();
        self.p.store(0, Ordering::Relaxed);
        self.memory_used.store(0, Ordering::Relaxed);
    }
}