srt-protocol 0.4.4

SRT implementation in Rust
Documentation
use std::{
    cmp::max,
    collections::BinaryHeap,
    convert::TryFrom,
    net::SocketAddr,
    time::{Duration, Instant},
};

use log::{error, warn};
use rand::{distributions::Bernoulli, prelude::*};
use rand_distr::Normal;

use srt_protocol::{
    connection::{Connection, ConnectionSettings, DuplexConnection, Input},
    options::*,
    packet::*,
    protocol::handshake::Handshake,
};

struct ScheduledInput(Instant, Input);

impl PartialEq for ScheduledInput {
    fn eq(&self, other: &Self) -> bool {
        self.0 == other.0
    }
}

impl Eq for ScheduledInput {}

impl PartialOrd for ScheduledInput {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for ScheduledInput {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.0.cmp(&other.0).reverse() // reverse to make it a min-heap
    }
}

pub fn input_data_simulation(
    start: Instant,
    count: usize,
    pace: Duration,
    peer: &mut PeerSimulator,
) {
    let count = u32::try_from(count).unwrap();
    for i in 1..=count {
        let t = start + pace * i;
        peer.schedule_input(t, Input::Data(Some((t, i.to_string().into()))));
    }
    peer.schedule_input(start + pace * (count + 1), Input::Data(None));
}

pub struct PeerSimulator {
    addr: SocketAddr,
    input: BinaryHeap<ScheduledInput>,
}

impl PeerSimulator {
    pub fn new(addr: SocketAddr) -> PeerSimulator {
        PeerSimulator {
            addr,
            input: BinaryHeap::new(),
        }
    }

    pub fn addr(&self) -> SocketAddr {
        self.addr
    }

    pub fn schedule_input(&mut self, release_at: Instant, input: Input) {
        self.input.push(ScheduledInput(release_at, input));
    }

    pub fn select_next_input(&mut self, now: Instant, next_timer: Instant) -> (Instant, Input) {
        if self.has_scheduled_input(next_timer) {
            self.input.pop().map(|i| (max(now, i.0), i.1)).unwrap()
        } else {
            (next_timer, Input::Timer)
        }
    }

    fn has_scheduled_input(&self, now: Instant) -> bool {
        self.input
            .peek()
            .map(|i| i.0)
            .filter(|t| *t <= now)
            .is_some()
    }
}

pub struct NetworkSimulator {
    pub sender: PeerSimulator,
    pub receiver: PeerSimulator,
}

impl NetworkSimulator {
    pub fn new(sender_addr: SocketAddr, receiver_addr: SocketAddr) -> NetworkSimulator {
        NetworkSimulator {
            sender: PeerSimulator::new(sender_addr),
            receiver: PeerSimulator::new(receiver_addr),
        }
    }

    pub fn send(&mut self, release_at: Instant, (packet, to): (Packet, SocketAddr)) {
        if to == self.sender.addr() {
            self.sender.schedule_input(
                release_at,
                Input::Packet(Ok((packet, self.receiver.addr()))),
            );
        } else if to == self.receiver.addr() {
            self.receiver
                .schedule_input(release_at, Input::Packet(Ok((packet, self.sender.addr()))));
        } else {
            error!("Dropping {:?}", packet)
        }
    }

    pub fn send_lossy(
        &mut self,
        sim: &mut RandomLossSimulation,
        now: Instant,
        packet: (Packet, SocketAddr),
    ) {
        self.send(
            match sim.next_packet_schedule(now) {
                Some(time) => time,
                None => {
                    warn!("Dropping {:?} to {}", packet.0, packet.1);
                    return;
                }
            },
            packet,
        )
    }
}

pub struct RandomLossSimulation {
    pub rng: StdRng,
    pub delay_dist: Normal<f64>,
    pub drop_dist: Bernoulli,
}

impl RandomLossSimulation {
    pub fn build(
        &mut self,
        start: Instant,
        latency: Duration,
        recv_buffer_size: PacketCount,
    ) -> (NetworkSimulator, DuplexConnection, DuplexConnection) {
        let sender = self.new_connection_settings(start, latency);
        let receiver = ConnectionSettings {
            remote: (sender.remote.ip(), sender.remote.port().wrapping_add(1)).into(),
            remote_sockid: sender.local_sockid,
            local_sockid: sender.remote_sockid,
            init_seq_num: sender.init_seq_num,
            recv_buffer_size,
            ..sender.clone()
        };

        let network = NetworkSimulator::new(receiver.remote, sender.remote);
        let sender = DuplexConnection::new(Connection {
            settings: sender,
            handshake: Handshake::Connector,
        });
        let receiver = DuplexConnection::new(Connection {
            settings: receiver,
            handshake: Handshake::Connector,
        });

        (network, sender, receiver)
    }

    pub fn next_packet_schedule(&mut self, now: Instant) -> Option<Instant> {
        if !self.drop_dist.sample(&mut self.rng) {
            Some(now + Duration::from_secs_f64(self.delay_dist.sample(&mut self.rng).abs()))
        } else {
            None
        }
    }

    fn new_connection_settings(&mut self, start: Instant, latency: Duration) -> ConnectionSettings {
        ConnectionSettings {
            remote: ([127, 0, 0, 1], self.rng.gen()).into(),
            remote_sockid: self.rng.gen(),
            local_sockid: self.rng.gen(),
            socket_start_time: start,
            rtt: Duration::default(),
            init_seq_num: self.rng.gen(),
            max_packet_size: PacketSize(1316),
            max_flow_size: PacketCount(8192),
            send_tsbpd_latency: latency,
            recv_tsbpd_latency: latency,
            cipher: None,
            stream_id: None,
            bandwidth: Default::default(),
            recv_buffer_size: PacketCount(8192),
            send_buffer_size: PacketCount(8192),
            statistics_interval: Duration::from_secs(1),
            peer_idle_timeout: Duration::from_secs(5),
            too_late_packet_drop: true,
        }
    }
}