torrust-actix 4.2.3

A rich, fast and efficient Bittorrent Tracker.
use crate::tracker::structs::torrent_tracker::TorrentTracker;
use crate::udp::enums::simple_proxy_protocol::SppParseResult;
use crate::udp::structs::number_of_downloads::NumberOfDownloads;
use crate::udp::structs::number_of_peers::NumberOfPeers;
use crate::udp::structs::port::Port;
use crate::udp::structs::response_peer::ResponsePeer;
use crate::udp::structs::simple_proxy_protocol::SppHeader;
use crate::udp::structs::torrent_scrape_statistics::TorrentScrapeStatistics;
use crate::udp::structs::udp_server::UdpServer;
use byteorder::{
    NetworkEndian,
    ReadBytesExt
};
use log::{
    error,
    info
};
use std::io::{
    Cursor,
    Error,
    ErrorKind
};
use std::net::{
    IpAddr,
    Ipv4Addr,
    Ipv6Addr,
    SocketAddr
};
use std::process::exit;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;

pub const PROTOCOL_IDENTIFIER: i64 = 4_497_486_125_440;
pub const MAX_SCRAPE_TORRENTS: u8 = 74;
pub const MAX_PACKET_SIZE: usize = 1496;
pub const SPP_HEADER_SIZE: usize = 38;
pub const SPP_MAGIC: u16 = 0x56EC;

#[allow(clippy::too_many_arguments)]
pub async fn udp_service(addr: SocketAddr, udp_threads: usize, worker_threads: usize, recv_buffer_size: usize, send_buffer_size: usize, reuse_address: bool, use_payload_ip: bool, simple_proxy_protocol: bool, data: Arc<TorrentTracker>, rx: tokio::sync::watch::Receiver<bool>, tokio_udp: Arc<Runtime>) -> JoinHandle<()>
{
    let udp_server = UdpServer::new(data, addr, udp_threads, worker_threads, recv_buffer_size, send_buffer_size, reuse_address, use_payload_ip, simple_proxy_protocol).await.unwrap_or_else(|e| {
        error!("Could not listen to the UDP port: {e}");
        exit(1);
    });
    let spp_status = if simple_proxy_protocol { " with Simple Proxy Protocol enabled" } else { "" };
    info!("[UDP] Starting a server listener on {addr} with {udp_threads} UDP threads and {worker_threads} worker threads{spp_status}");
    tokio_udp.spawn(async move {
        udp_server.start(rx).await;
    })
}

#[inline]
pub fn parse_ipv4_peers(bytes: &[u8]) -> Result<Vec<ResponsePeer<Ipv4Addr>>, Error> {
    let chunk_size = 6;
    let peer_count = bytes.len() / chunk_size;
    let mut peers = Vec::with_capacity(peer_count);
    for chunk in bytes.chunks_exact(chunk_size) {
        let ip_bytes: [u8; 4] = chunk[..4].try_into().map_err(|_|
            Error::new(ErrorKind::InvalidData, "Invalid IPv4 address bytes")

        )?;
        let port = (&chunk[4..6]).read_u16::<NetworkEndian>().map_err(|e|
            Error::new(ErrorKind::InvalidData, e)

        )?;
        peers.push(ResponsePeer {
            ip_address: Ipv4Addr::from(ip_bytes),
            port: Port(port),
        });
    }
    Ok(peers)
}

#[inline]
pub fn parse_ipv6_peers(bytes: &[u8]) -> Result<Vec<ResponsePeer<Ipv6Addr>>, Error> {
    let chunk_size = 18;
    let peer_count = bytes.len() / chunk_size;
    let mut peers = Vec::with_capacity(peer_count);
    for chunk in bytes.chunks_exact(chunk_size) {
        let ip_bytes: [u8; 16] = chunk[..16].try_into().map_err(|_|
            Error::new(ErrorKind::InvalidData, "Invalid IPv6 address bytes")

        )?;
        let port = (&chunk[16..18]).read_u16::<NetworkEndian>().map_err(|e|
            Error::new(ErrorKind::InvalidData, e)

        )?;
        peers.push(ResponsePeer {
            ip_address: Ipv6Addr::from(ip_bytes),
            port: Port(port),
        });
    }
    Ok(peers)
}

#[inline]
pub fn parse_scrape_stats(bytes: &[u8]) -> Result<Vec<TorrentScrapeStatistics>, Error> {
    let chunk_size = 12;
    let stats_count = bytes.len() / chunk_size;
    let mut stats = Vec::with_capacity(stats_count);
    for chunk in bytes.chunks_exact(chunk_size) {
        let mut cursor = Cursor::new(chunk);
        let seeders = cursor.read_i32::<NetworkEndian>().map_err(|e|
            Error::new(ErrorKind::InvalidData, e)

        )?;
        let downloads = cursor.read_i32::<NetworkEndian>().map_err(|e|
            Error::new(ErrorKind::InvalidData, e)

        )?;
        let leechers = cursor.read_i32::<NetworkEndian>().map_err(|e|
            Error::new(ErrorKind::InvalidData, e)

        )?;
        stats.push(TorrentScrapeStatistics {
            seeders: NumberOfPeers(seeders),
            completed: NumberOfDownloads(downloads),
            leechers: NumberOfPeers(leechers),
        });
    }
    Ok(stats)
}

pub fn parse_address(bytes: &[u8; 16]) -> IpAddr {
    let is_ipv4_mapped = bytes[0..10] == [0u8; 10] && bytes[10] == 0xff && bytes[11] == 0xff;
    if is_ipv4_mapped {
        IpAddr::V4(Ipv4Addr::new(bytes[12], bytes[13], bytes[14], bytes[15]))
    } else {
        IpAddr::V6(Ipv6Addr::from(*bytes))
    }
}

pub fn parse_spp_header(data: &[u8]) -> SppParseResult {
    if data.len() < 2 {
        return SppParseResult::NotPresent;
    }
    let magic = u16::from_be_bytes([data[0], data[1]]);
    if magic != SPP_MAGIC {
        return SppParseResult::NotPresent;
    }
    if data.len() < SPP_HEADER_SIZE {
        return SppParseResult::Malformed(format!(
            "SPP magic found but packet too small: {} bytes, need {}",
            data.len(),
            SPP_HEADER_SIZE
        ));
    }
    let client_addr_bytes: [u8; 16] = data[2..18]
        .try_into()
        .expect("slice with correct length");
    let client_addr = parse_address(&client_addr_bytes);
    let proxy_addr_bytes: [u8; 16] = data[18..34]
        .try_into()
        .expect("slice with correct length");
    let proxy_addr = parse_address(&proxy_addr_bytes);
    let client_port = u16::from_be_bytes([data[34], data[35]]);
    let proxy_port = u16::from_be_bytes([data[36], data[37]]);
    SppParseResult::Found {
        header: SppHeader {
            client_addr,
            client_port,
            proxy_addr,
            proxy_port,
        },
        payload_offset: SPP_HEADER_SIZE,
    }
}

#[inline]
pub fn has_spp_magic(data: &[u8]) -> bool {
    data.len() >= 2 && data[0] == 0x56 && data[1] == 0xEC
}