extern crate bytes;
extern crate env_logger;
extern crate kcp;
extern crate rand;
extern crate time;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::io::{self, Cursor, ErrorKind, Read, Write};
use std::rc::Rc;
use std::thread::sleep;
use std::time::Duration;
use bytes::buf::{Buf, BufMut};
use bytes::BytesMut;
use rand::Rng;
use kcp::Kcp;
#[derive(Debug)]
struct DelayPacket {
buf: BytesMut,
ts: u32,
}
impl DelayPacket {
fn new(buf: BytesMut) -> DelayPacket {
DelayPacket { buf, ts: 0 }
}
fn len(&self) -> usize {
self.buf.len()
}
fn ts(&self) -> u32 {
self.ts
}
fn set_ts(&mut self, ts: u32) {
self.ts = ts;
}
fn reader(self) -> Cursor<BytesMut> {
Cursor::new(self.buf)
}
}
struct Random {
seeds: Vec<u32>,
size: usize,
}
impl Random {
fn new(size: usize) -> Random {
Random {
seeds: vec![0u32; size],
size: 0,
}
}
fn random(&mut self) -> u32 {
if self.seeds.is_empty() {
return 0;
}
if self.size == 0 {
for (i, e) in self.seeds.iter_mut().enumerate() {
*e = i as u32;
}
self.size = self.seeds.len();
}
let i = rand::rng().random_range(0..self.size);
let x = self.seeds[i];
self.size -= 1;
self.seeds[i] = self.seeds[self.size];
x
}
}
#[inline]
fn current() -> u32 {
(time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1000000) as u32
}
struct LatencySimulator {
lostrate: u32,
rttmin: u32,
rttmax: u32,
nmax: usize,
tx1: u32,
tx2: u32,
current: u32,
p12: VecDeque<DelayPacket>,
p21: VecDeque<DelayPacket>,
r12: Random,
r21: Random,
}
impl LatencySimulator {
fn new(lostrate: u32, rttmin: u32, rttmax: u32, nmax: usize) -> LatencySimulator {
LatencySimulator {
lostrate: lostrate / 2,
rttmin: rttmin / 2,
rttmax: rttmax / 2,
nmax,
tx1: 0,
tx2: 0,
current: crate::current(),
p12: VecDeque::new(),
p21: VecDeque::new(),
r12: Random::new(100),
r21: Random::new(100),
}
}
fn send(&mut self, peer: u32, data: &[u8]) -> usize {
if peer == 0 {
self.tx1 += 1;
if self.r12.random() < self.lostrate {
return data.len();
}
if self.p12.len() >= self.nmax {
return data.len();
}
} else {
self.tx2 += 1;
if self.r21.random() < self.lostrate {
return data.len();
}
if self.p21.len() >= self.nmax {
return data.len();
}
}
let mut pkg = DelayPacket::new(BytesMut::from(data));
self.current = crate::current();
let mut delay = self.rttmin;
if self.rttmax > self.rttmin {
delay += rand::random::<u32>() % (self.rttmax - self.rttmin);
}
pkg.set_ts(self.current + delay);
if peer == 0 {
self.p12.push_back(pkg);
} else {
self.p21.push_back(pkg);
}
data.len()
}
fn recv(&mut self, peer: u32, data: &mut [u8]) -> io::Result<usize> {
{
let pkg = if peer == 0 {
match self.p12.front() {
None => {
return Err(io::Error::new(ErrorKind::WouldBlock, "No packet yet"));
}
Some(pkg) => pkg,
}
} else {
match self.p21.front() {
None => {
return Err(io::Error::new(ErrorKind::WouldBlock, "No packet yet"));
}
Some(pkg) => pkg,
}
};
self.current = crate::current();
if self.current < pkg.ts() {
return Err(io::Error::new(ErrorKind::WouldBlock, "No packet yet"));
}
if data.len() < pkg.len() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"Buffer is too small",
));
}
}
let pkg = if peer == 0 {
self.p12.pop_front().unwrap()
} else {
self.p21.pop_front().unwrap()
};
pkg.reader().read(data)
}
}
struct KcpOutput {
sim: Rc<RefCell<LatencySimulator>>,
peer: u32,
}
impl Write for KcpOutput {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
let mut sim = self.sim.borrow_mut();
Ok(sim.send(self.peer, data))
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[derive(Debug)]
enum TestMode {
Default,
Normal,
Fast,
}
fn run(mode: TestMode, msgcount: u32, lostrate: u32) {
let vnet = LatencySimulator::new(lostrate, 60, 125, 1000);
let vnet = Rc::new(RefCell::new(vnet));
let mut kcp1 = Kcp::new(
0x11223344,
KcpOutput {
sim: vnet.clone(),
peer: 0,
},
);
let mut kcp2 = Kcp::new(
0x11223344,
KcpOutput {
sim: vnet.clone(),
peer: 1,
},
);
let mut current = crate::current();
let mut slap = current + 20;
let mut index = 0;
let mut next = 0;
let mut count = 0;
let mut maxrtt = 0;
kcp1.set_wndsize(128, 128);
kcp2.set_wndsize(128, 128);
match mode {
TestMode::Default => {
kcp1.set_nodelay(false, 10, 0, false);
kcp2.set_nodelay(false, 10, 0, false);
}
TestMode::Normal => {
kcp1.set_nodelay(false, 10, 0, true);
kcp2.set_nodelay(false, 10, 0, true);
}
TestMode::Fast => {
kcp1.set_nodelay(true, 10, 2, true);
kcp2.set_nodelay(true, 10, 2, true);
kcp1.set_rx_minrto(10);
kcp2.set_fast_resend(1);
}
}
let mut buf = [0u8; 2000];
while next <= msgcount {
sleep(Duration::from_millis(1));
current = crate::current();
kcp1.update(crate::current()).unwrap();
kcp2.update(crate::current()).unwrap();
while current >= slap {
let mut buf = BytesMut::with_capacity(8);
buf.put_u32_le(index);
index += 1;
buf.put_u32_le(current);
kcp1.send(&buf).unwrap();
slap += 20;
}
loop {
let mut vn = vnet.borrow_mut();
match vn.recv(1, &mut buf) {
Err(..) => break,
Ok(n) => {
kcp2.input(&buf[..n]).unwrap();
}
}
}
loop {
let mut vn = vnet.borrow_mut();
match vn.recv(0, &mut buf) {
Err(..) => break,
Ok(n) => {
kcp1.input(&buf[..n]).unwrap();
}
}
}
loop {
match kcp2.recv(&mut buf) {
Err(..) => break,
Ok(n) => {
kcp2.send(&buf[..n]).unwrap();
}
}
}
loop {
match kcp1.recv(&mut buf) {
Err(..) => break,
Ok(n) => {
let mut cur = Cursor::new(&buf[..n]);
let sn = cur.get_u32_le();
let ts = cur.get_u32_le();
let rtt = current - ts;
if sn != next {
panic!(
"Received not continuously packet: sn {} <-> {}",
count, next
);
}
next += 1;
count += 1;
if rtt > maxrtt {
maxrtt = rtt;
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn kcp_default() {
run(TestMode::Default, 1000, 10);
}
#[test]
fn kcp_normal() {
run(TestMode::Normal, 1000, 10);
}
#[test]
fn kcp_fast() {
run(TestMode::Fast, 1000, 10);
}
#[test]
fn kcp_massive_lost_default() {
run(TestMode::Default, 1000, 50);
}
#[test]
fn kcp_massive_lost_normal() {
run(TestMode::Normal, 1000, 50);
}
#[test]
fn kcp_massive_lost_fast() {
run(TestMode::Fast, 1000, 50);
}
}