torrust-actix 4.1.2

A rich, fast and efficient Bittorrent Tracker.
use crate::common::structs::number_of_bytes::NumberOfBytes;
use crate::stats::enums::stats_event::StatsEvent;
use crate::tracker::enums::torrent_peers_type::TorrentPeersType;
use crate::tracker::structs::info_hash::InfoHash;
use crate::tracker::structs::peer_id::PeerId;
use crate::tracker::structs::torrent_entry::{AHashMap, TorrentEntry};
use crate::tracker::structs::torrent_peer::TorrentPeer;
use crate::tracker::structs::torrent_peers::TorrentPeers;
use crate::tracker::structs::torrent_tracker::TorrentTracker;
use log::info;
use std::collections::btree_map::Entry;
use std::net::{IpAddr, SocketAddr};

impl TorrentTracker {
    #[tracing::instrument(level = "debug")]
    pub fn get_torrent_peers(&self, info_hash: InfoHash, amount: usize, ip_type: TorrentPeersType, self_ip: Option<IpAddr>) -> Option<TorrentPeers>
    {
        self.get_torrent(info_hash).map(|data| {
            let mut returned_data = TorrentPeers {
                seeds_ipv4: AHashMap::default(),
                seeds_ipv6: AHashMap::default(),
                peers_ipv4: AHashMap::default(),
                peers_ipv6: AHashMap::default()
            };
            match ip_type {
                TorrentPeersType::All => {
                    returned_data.seeds_ipv4 = self.get_peers(&data.seeds, TorrentPeersType::IPv4, self_ip, amount);
                    returned_data.seeds_ipv6 = self.get_peers(&data.seeds, TorrentPeersType::IPv6, self_ip, amount);
                    returned_data.peers_ipv4 = self.get_peers(&data.peers, TorrentPeersType::IPv4, self_ip, amount);
                    returned_data.peers_ipv6 = self.get_peers(&data.peers, TorrentPeersType::IPv6, self_ip, amount);
                }
                TorrentPeersType::IPv4 => {
                    returned_data.seeds_ipv4 = self.get_peers(&data.seeds, TorrentPeersType::IPv4, self_ip, amount);
                    returned_data.peers_ipv4 = self.get_peers(&data.peers, TorrentPeersType::IPv4, self_ip, amount);
                }
                TorrentPeersType::IPv6 => {
                    returned_data.seeds_ipv6 = self.get_peers(&data.seeds, TorrentPeersType::IPv6, self_ip, amount);
                    returned_data.peers_ipv6 = self.get_peers(&data.peers, TorrentPeersType::IPv6, self_ip, amount);
                }
            }
            returned_data
        })
    }

    #[tracing::instrument(level = "debug")]
    #[inline]
    pub fn get_peers(&self, peers: &AHashMap<PeerId, TorrentPeer>, type_ip: TorrentPeersType, self_ip: Option<IpAddr>, amount: usize) -> AHashMap<PeerId, TorrentPeer>
    {
        let should_include = |peer_addr: &SocketAddr| -> bool {
            let ip_type_match = match type_ip {
                TorrentPeersType::All => return false,
                TorrentPeersType::IPv4 => peer_addr.is_ipv4(),
                TorrentPeersType::IPv6 => peer_addr.is_ipv6(),
            };
            ip_type_match && self_ip.is_none_or(|ip| ip != peer_addr.ip())
        };
        let mut result = AHashMap::default();
        result.reserve(amount.min(peers.len()));
        for (peer_id, torrent_peer) in peers.iter() {
            if amount != 0 && result.len() >= amount {
                break;
            }
            if should_include(&torrent_peer.peer_addr) {
                result.insert(*peer_id, torrent_peer.clone());
            }
        }
        result
    }

    #[tracing::instrument(level = "debug")]
    pub fn add_torrent_peer(&self, info_hash: InfoHash, peer_id: PeerId, torrent_peer: TorrentPeer, completed: bool) -> (Option<TorrentEntry>, TorrentEntry)
    {
        let shard = self.torrents_sharding.get_shard(info_hash.0[0]).unwrap();
        let mut lock = shard.write();
        match lock.entry(info_hash) {
            Entry::Vacant(v) => {
                let mut torrent_entry = TorrentEntry {
                    seeds: AHashMap::default(),
                    peers: AHashMap::default(),
                    completed: 0,
                    updated: std::time::Instant::now()
                };
                if completed && torrent_peer.left == NumberOfBytes(0) {
                    self.update_stats(StatsEvent::Completed, 1);
                    torrent_entry.completed = 1;
                }
                self.update_stats(StatsEvent::Torrents, 1);
                if torrent_peer.left == NumberOfBytes(0) {
                    self.update_stats(StatsEvent::Seeds, 1);
                    torrent_entry.seeds.insert(peer_id, torrent_peer);
                } else {
                    self.update_stats(StatsEvent::Peers, 1);
                    torrent_entry.peers.insert(peer_id, torrent_peer);
                }
                let entry_clone = torrent_entry.clone();
                v.insert(torrent_entry);
                (None, entry_clone)
            }
            Entry::Occupied(mut o) => {
                let previous_torrent = o.get().clone();
                let entry = o.get_mut();
                let was_seed = entry.seeds.remove(&peer_id).is_some();
                let was_peer = entry.peers.remove(&peer_id).is_some();
                if was_seed {
                    self.update_stats(StatsEvent::Seeds, -1);
                }
                if was_peer {
                    self.update_stats(StatsEvent::Peers, -1);
                }
                if completed {
                    self.update_stats(StatsEvent::Completed, 1);
                    entry.completed += 1;
                }
                if torrent_peer.left == NumberOfBytes(0) {
                    self.update_stats(StatsEvent::Seeds, 1);
                    entry.seeds.insert(peer_id, torrent_peer);
                } else {
                    self.update_stats(StatsEvent::Peers, 1);
                    entry.peers.insert(peer_id, torrent_peer);
                }
                entry.updated = std::time::Instant::now();
                (Some(previous_torrent), entry.clone())
            }
        }
    }

    #[tracing::instrument(level = "debug")]
    pub fn remove_torrent_peer(&self, info_hash: InfoHash, peer_id: PeerId, persistent: bool, cleanup: bool) -> (Option<TorrentEntry>, Option<TorrentEntry>)
    {
        if !self.torrents_sharding.contains_peer(info_hash, peer_id) {
            return (None, None);
        }
        let shard = self.torrents_sharding.get_shard(info_hash.0[0]).unwrap();
        let mut lock = shard.write();
        match lock.entry(info_hash) {
            Entry::Vacant(_) => (None, None),
            Entry::Occupied(mut o) => {
                if cleanup {
                    info!("[PEERS] Removing from torrent {info_hash} peer {peer_id}");
                }
                let previous_torrent = o.get().clone();
                let entry = o.get_mut();
                let was_seed = entry.seeds.remove(&peer_id).is_some();
                let was_peer = entry.peers.remove(&peer_id).is_some();
                if was_seed {
                    self.update_stats(StatsEvent::Seeds, -1);
                }
                if was_peer {
                    self.update_stats(StatsEvent::Peers, -1);
                }
                if !persistent && entry.seeds.is_empty() && entry.peers.is_empty() {
                    o.remove();
                    self.update_stats(StatsEvent::Torrents, -1);
                    (Some(previous_torrent), None)
                } else {
                    (Some(previous_torrent), Some(entry.clone()))
                }
            }
        }
    }
}