use bytes::Bytes;
use futures::prelude::*;
use futures::stream;
use log::info;
use rand::{prelude::StdRng, Rng, SeedableRng};
use srt_tokio::SrtSocket;
use std::time::{Duration, Instant};
use tokio::{net::UdpSocket, time::sleep};
#[tokio::test]
async fn invalid_packets() {
let _ = pretty_env_logger::try_init();
let sender = async {
let mut sender = SrtSocket::builder()
.local_port(8877)
.call("127.0.0.1:8876", None)
.await
.unwrap();
let mut counting_stream =
tokio_stream::StreamExt::throttle(stream::iter(0..100), Duration::from_millis(1))
.map(|i| Ok((Instant::now(), Bytes::from(i.to_string()))))
.boxed();
sender.send_all(&mut counting_stream).await.unwrap();
sender.close().await.unwrap();
};
let recvr = async {
let mut recvr = SrtSocket::builder().listen_on(":8876").await.unwrap();
info!("Receiver initialised");
for _ in 0..100 {
recvr.try_next().await.unwrap().unwrap();
}
assert_eq!(recvr.try_next().await.unwrap(), None);
};
let garbage = tokio::spawn(async move {
let s = match std::env::var("INVALID_PACKETS_SEED") {
Ok(s) => {
info!("Using seed from env");
s.parse().unwrap()
}
Err(_) => rand::random(),
};
info!("Seed is {}", s);
let mut rng = StdRng::seed_from_u64(s);
let sock = UdpSocket::bind(&"127.0.0.1:0").await.unwrap();
loop {
sleep(Duration::from_millis(1)).await;
let mut to_send = vec![0; rng.gen_range(1..=1024)];
for i in &mut to_send {
*i = rng.gen();
}
sock.send_to(&to_send[..], "127.0.0.1:8876").await.unwrap();
sock.send_to(&to_send[..], "127.0.0.1:8877").await.unwrap();
}
});
futures::select! {
_ = futures::future::join(sender, recvr).fuse() => {},
_ = garbage.fuse() => {}
}
}