use std::cmp::{Ord, Ordering};
use std::collections::{binary_heap, BinaryHeap};
use std::convert::From;
use std::hash::{Hash, Hasher};
use std::mem;
use std::net::SocketAddr;
use std::ops::{Deref, DerefMut};
use std::sync::Weak;
use std::time::Instant;
use bytes::Bytes;
use chrono::{DateTime, Duration, Utc};
use failure::format_err;
use futures::sync::mpsc;
use futures::task::{self, Task};
use futures::{self, Async, Future, Sink};
use parking_lot::Mutex;
use slog::{info, warn, Logger};
use tokio::timer::Delay;
use crate::connectionmanager::{ConnectionManager, Resender, ResenderEvent};
use crate::handler_data::{ConnectionValue, ConnectionValueWeak, Data};
use crate::packets::*;
use crate::{Error, LockedHashMap};
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
struct PacketId(PacketType, u32, u16);
#[derive(Clone, Debug)]
struct SendRecord {
pub sent: DateTime<Utc>,
pub last: DateTime<Utc>,
pub tries: usize,
pub id: PacketId,
pub packet: Bytes,
}
impl PartialEq for SendRecord {
fn eq(&self, other: &Self) -> bool { self.cmp(other) == Ordering::Equal }
}
impl Eq for SendRecord {}
impl PartialOrd for SendRecord {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for SendRecord {
fn cmp(&self, other: &Self) -> Ordering {
if self.tries == 0 {
if other.tries == 0 {
self.id
.1
.cmp(&other.id.1)
.reverse()
.then_with(|| self.id.2.cmp(&other.id.2).reverse())
} else {
Ordering::Greater
}
} else if other.tries == 0 {
Ordering::Less
} else {
self.last.cmp(&other.last).reverse().then_with(||
self.id.1.cmp(&other.id.1).reverse().then_with(||
self.id.2.cmp(&other.id.2).reverse()))
}
}
}
impl Hash for SendRecord {
fn hash<H: Hasher>(&self, state: &mut H) { self.id.hash(state); }
}
#[derive(Debug)]
pub struct DefaultResender {
logger: Logger,
state: ResendStates,
config: ResendConfig,
srtt: Duration,
srtt_dev: Duration,
resender_task: Vec<Task>,
resender_future_task: Option<Task>,
}
#[derive(Debug)]
enum ResendStates {
Connecting {
to_send: BinaryHeap<SendRecord>,
start_time: DateTime<Utc>,
},
Normal { to_send: BinaryHeap<SendRecord> },
Stalling {
to_send: Vec<SendRecord>,
start_time: DateTime<Utc>,
},
Dead {
to_send: Vec<SendRecord>,
start_time: DateTime<Utc>,
},
Disconnecting {
to_send: BinaryHeap<SendRecord>,
start_time: DateTime<Utc>,
},
}
impl DefaultResender {
pub fn new(config: ResendConfig, logger: Logger) -> Self {
let srtt = config.srtt;
let srtt_dev = config.srtt_dev;
Self {
logger,
state: ResendStates::Connecting {
to_send: Default::default(),
start_time: Utc::now(),
},
config,
srtt,
srtt_dev,
resender_task: Vec::new(),
resender_future_task: None,
}
}
pub fn update_srtt(&mut self, rtt: Duration) {
let diff = if rtt > self.srtt {
rtt - self.srtt
} else {
self.srtt - rtt
};
self.srtt_dev = self.srtt_dev * 3 / 4 + diff / 4;
self.srtt = self.srtt * 7 / 8 + rtt / 8;
}
fn set_state(&mut self, state: ResendStates) -> ResendStates {
info!(self.logger, "Changed state"; "old" => self.state.get_name(),
"new" => state.get_name());
let old = mem::replace(&mut self.state, state);
if let Some(ref task) = self.resender_future_task {
task.notify();
}
old
}
}
impl Drop for DefaultResender {
fn drop(&mut self) {
if let Some(ref task) = self.resender_future_task {
task.notify();
}
}
}
impl Resender for DefaultResender {
fn ack_packet(&mut self, p_type: PacketType, p_id: u16) {
let rec = match &mut self.state {
ResendStates::Stalling { to_send, .. }
| ResendStates::Dead { to_send, .. } => {
if let Some(i) = to_send
.iter()
.position(|rec| rec.id.0 == p_type && rec.id.2 == p_id)
{
Some(to_send.remove(i))
} else {
None
}
}
ResendStates::Connecting { to_send, .. }
| ResendStates::Normal { to_send }
| ResendStates::Disconnecting { to_send, .. } => {
if let Some(is_first) = to_send
.peek()
.map(|rec| rec.id.0 == p_type && rec.id.2 == p_id)
{
if is_first {
to_send.pop()
} else {
let tmp = mem::replace(to_send, BinaryHeap::new());
let mut v = tmp.into_vec();
let mut rec = None;
if let Some(i) = v.iter().position(|rec| {
rec.id.0 == p_type && rec.id.2 == p_id
}) {
rec = Some(v.remove(i));
}
mem::replace(to_send, v.into());
rec
}
} else {
None
}
}
};
if let Some(rec) = rec {
if rec.tries == 1 {
let now = Utc::now();
let diff =
now.naive_utc().signed_duration_since(rec.sent.naive_utc());
self.update_srtt(diff);
}
}
let next_state = match &mut self.state {
ResendStates::Stalling { to_send, .. }
| ResendStates::Dead { to_send, .. } => {
for rec in to_send.iter_mut() {
rec.tries = 0;
}
let to_send = mem::replace(to_send, Vec::new()).into();
self.srtt = self.config.normal_timeout / 4;
Some(ResendStates::Normal { to_send })
}
_ => None,
};
if let Some(next_state) = next_state {
self.set_state(next_state);
if let Some(ref task) = self.resender_future_task {
task.notify();
}
}
for t in self.resender_task.drain(..) {
t.notify()
}
}
fn send_voice_packets(&self, _: PacketType) -> bool {
match self.state {
ResendStates::Connecting { .. }
| ResendStates::Stalling { .. }
| ResendStates::Dead { .. }
| ResendStates::Disconnecting { .. } => false,
ResendStates::Normal { .. } => true,
}
}
fn is_empty(&self) -> bool {
match self.state {
ResendStates::Connecting { ref to_send, .. }
| ResendStates::Disconnecting { ref to_send, .. }
| ResendStates::Normal { ref to_send, .. } => to_send.is_empty(),
ResendStates::Stalling { ref to_send, .. }
| ResendStates::Dead { ref to_send, .. } => to_send.is_empty(),
}
}
fn handle_event(&mut self, event: ResenderEvent) {
let to_send = match &mut self.state {
ResendStates::Stalling { to_send, .. }
| ResendStates::Dead { to_send, .. } => {
let v = mem::replace(to_send, Vec::new());
v.into_iter()
.map(|mut rec| {
rec.tries = 0;
rec
})
.collect()
}
ResendStates::Connecting { to_send, .. }
| ResendStates::Normal { to_send }
| ResendStates::Disconnecting { to_send, .. } => {
mem::replace(to_send, BinaryHeap::new()).into_vec().into()
}
};
let next_state = match event {
ResenderEvent::Connecting => ResendStates::Connecting {
to_send,
start_time: Utc::now(),
},
ResenderEvent::Disconnecting => ResendStates::Disconnecting {
to_send,
start_time: Utc::now(),
},
ResenderEvent::Connected => ResendStates::Normal { to_send },
};
self.set_state(next_state);
if let Some(ref task) = self.resender_future_task {
task.notify();
}
}
fn udp_packet_received(&mut self, _: &Bytes) {
let next_state = match self.state {
ResendStates::Dead {
ref mut to_send, ..
} => {
let to_send = mem::replace(to_send, Vec::new());
Some(ResendStates::Stalling {
to_send,
start_time: Utc::now(),
})
}
_ => None,
};
if let Some(next_state) = next_state {
self.set_state(next_state);
if let Some(ref task) = self.resender_future_task {
task.notify();
}
}
}
fn is_disconnecting(&self) -> bool {
if let ResendStates::Disconnecting { .. } = self.state {
true
} else {
false
}
}
}
impl Sink for DefaultResender {
type SinkItem = (PacketType, u32, u16, Bytes);
type SinkError = Error;
fn start_send(
&mut self,
(p_type, p_gen, p_id, packet): Self::SinkItem,
) -> futures::StartSend<Self::SinkItem, Self::SinkError>
{
let rec = SendRecord {
sent: Utc::now(),
last: Utc::now(),
tries: 0,
id: PacketId(p_type, p_gen, p_id),
packet,
};
let mut rec_res = None;
match &mut self.state {
ResendStates::Connecting {
to_send,
start_time,
}
| ResendStates::Disconnecting {
to_send,
start_time,
} => {
if to_send.len() >= self.config.max_send_queue_len {
rec_res = Some(rec);
} else {
to_send.push(rec);
*start_time = Utc::now();
}
}
ResendStates::Stalling { to_send, .. }
| ResendStates::Dead { to_send, .. } => {
if to_send.len() >= self.config.max_send_queue_len {
rec_res = Some(rec);
} else {
to_send.push(rec);
}
}
ResendStates::Normal { to_send } => {
if to_send.len() >= self.config.max_send_queue_len {
rec_res = Some(rec);
} else {
to_send.push(rec);
}
}
}
if let Some(rec) = rec_res {
self.resender_task.push(task::current());
Ok(futures::AsyncSink::NotReady((
rec.id.0, rec.id.1, rec.id.2, rec.packet,
)))
} else {
if let Some(ref task) = self.resender_future_task {
task.notify();
}
Ok(futures::AsyncSink::Ready)
}
}
fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
Ok(futures::Async::Ready(()))
}
}
enum PeekMut<'a, T: Ord + 'a> {
Ref(&'a mut T),
Heap(binary_heap::PeekMut<'a, T>),
}
impl<'a, T: Ord + 'a> Deref for PeekMut<'a, T> {
type Target = T;
fn deref(&self) -> &T {
match *self {
PeekMut::Ref(ref r) => r,
PeekMut::Heap(ref r) => r.deref(),
}
}
}
impl<'a, T: Ord + 'a> DerefMut for PeekMut<'a, T> {
fn deref_mut(&mut self) -> &mut T {
match *self {
PeekMut::Ref(ref mut r) => r,
PeekMut::Heap(ref mut r) => r.deref_mut(),
}
}
}
impl<'a, T: Ord + 'a> From<&'a mut T> for PeekMut<'a, T> {
fn from(t: &'a mut T) -> PeekMut<'a, T> { PeekMut::Ref(t) }
}
impl<'a, T: Ord + 'a> From<binary_heap::PeekMut<'a, T>> for PeekMut<'a, T> {
fn from(t: binary_heap::PeekMut<'a, T>) -> PeekMut<'a, T> {
PeekMut::Heap(t)
}
}
impl ResendStates {
fn peek_mut_next_record(&mut self) -> Option<PeekMut<SendRecord>> {
match *self {
ResendStates::Stalling {
ref mut to_send, ..
} => to_send.first_mut().map(|r| r.into()),
ResendStates::Connecting {
ref mut to_send, ..
}
| ResendStates::Normal { ref mut to_send }
| ResendStates::Disconnecting {
ref mut to_send, ..
} => to_send.peek_mut().map(|r| r.into()),
ResendStates::Dead { .. } => None,
}
}
fn get_packet_interval(&self, config: &ResendConfig) -> Option<Duration> {
match *self {
ResendStates::Connecting { .. } => Some(config.connecting_interval),
ResendStates::Normal { .. } | ResendStates::Dead { .. } => None,
ResendStates::Stalling { .. } => Some(config.stalling_interval),
ResendStates::Disconnecting { .. } => {
Some(config.disconnect_interval)
}
}
}
fn get_name(&self) -> &'static str {
match *self {
ResendStates::Connecting { .. } => "Connecting",
ResendStates::Normal { .. } => "Normal",
ResendStates::Stalling { .. } => "Stalling",
ResendStates::Dead { .. } => "Dead",
ResendStates::Disconnecting { .. } => "Disconnecting",
}
}
}
#[derive(Clone, Debug)]
pub struct ResendConfig {
pub connecting_interval: Duration,
pub connecting_timeout: Duration,
pub normal_timeout: Duration,
pub stalling_interval: Duration,
pub stalling_timeout: Duration,
pub dead_timeout: Duration,
pub disconnect_timeout: Duration,
pub disconnect_interval: Duration,
pub srtt: Duration,
pub srtt_dev: Duration,
pub max_send_queue_len: usize,
}
impl Default for ResendConfig {
fn default() -> Self {
ResendConfig {
connecting_interval: Duration::seconds(1),
connecting_timeout: Duration::seconds(5),
normal_timeout: Duration::seconds(10),
stalling_interval: Duration::seconds(5),
stalling_timeout: Duration::seconds(30),
dead_timeout: Duration::seconds(0),
disconnect_timeout: Duration::seconds(5),
disconnect_interval: Duration::seconds(1),
srtt: Duration::milliseconds(2500),
srtt_dev: Duration::milliseconds(0),
max_send_queue_len: 50,
}
}
}
pub struct ResendFuture<CM: ConnectionManager + 'static> {
data: Weak<Mutex<Data<CM>>>,
is_client: bool,
logger: Logger,
connections: LockedHashMap<CM::Key, ConnectionValue<CM::AssociatedData>>,
connection_key: CM::Key,
connection: ConnectionValueWeak<CM::AssociatedData>,
sink: mpsc::Sender<(SocketAddr, Bytes)>,
timeout: Delay,
state_timeout: Delay,
is_sending: bool,
}
impl<CM: ConnectionManager + 'static> ResendFuture<CM> {
pub fn new(
data: &Data<CM>,
datam: Weak<Mutex<Data<CM>>>,
connection_key: CM::Key,
) -> Self
{
let connection = data
.get_connection(&connection_key)
.expect("Connection for resender not found")
.downgrade();
Self {
data: datam,
logger: data.logger.clone(),
is_client: data.is_client,
connections: data.connections.clone(),
connection_key,
connection,
sink: data.udp_packet_sink.clone(),
timeout: Delay::new(Instant::now()),
state_timeout: Delay::new(Instant::now()),
is_sending: false,
}
}
}
impl<CM: ConnectionManager + 'static> Future for ResendFuture<CM> {
type Item = ();
type Error = Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
if !self.connections.read().contains_key(&self.connection_key) {
return Ok(futures::Async::Ready(()));
}
if self.is_sending {
if let futures::Async::Ready(()) =
self.sink.poll_complete().map_err(|e| {
format_err!(
"Failed to poll_complete udp packet sink ({:?})",
e
)
})? {
self.is_sending = false;
} else {
return Ok(futures::Async::NotReady);
}
}
let con = match self.connection.upgrade() {
Some(c) => c,
None => return Ok(Async::Ready(())),
};
let mut con = con.mutex.lock();
let con = &mut con.1;
con.resender.resender_future_task = Some(task::current());
let now = Utc::now();
let now_naive = now.naive_utc();
enum StateChange {
Nothing,
EndConnection,
NewState(ResendStates),
}
let next_state = {
let resender = &mut con.resender;
match resender.state {
ResendStates::Connecting { ref start_time, .. } => {
if now_naive.signed_duration_since(start_time.naive_utc())
>= resender.config.connecting_timeout
{
StateChange::EndConnection
} else {
let dur = (*start_time
+ resender.config.connecting_timeout)
.naive_utc()
.signed_duration_since(now_naive);
let next = Instant::now() + dur.to_std().unwrap();
self.state_timeout.reset(next);
if let futures::Async::Ready(()) =
self.state_timeout.poll()?
{
task::current().notify();
}
StateChange::Nothing
}
}
ResendStates::Normal { .. } => StateChange::Nothing,
ResendStates::Stalling {
ref mut to_send,
ref start_time,
} => {
if now_naive.signed_duration_since(start_time.naive_utc())
>= resender.config.stalling_timeout
{
StateChange::NewState(ResendStates::Dead {
to_send: mem::replace(to_send, Vec::new()),
start_time: Utc::now(),
})
} else {
let dur = (*start_time
+ resender.config.stalling_timeout)
.naive_utc()
.signed_duration_since(now_naive);
let next = Instant::now() + dur.to_std().unwrap();
self.state_timeout.reset(next);
if let futures::Async::Ready(()) =
self.state_timeout.poll()?
{
task::current().notify();
}
StateChange::Nothing
}
}
ResendStates::Dead { ref start_time, .. } => {
if now_naive.signed_duration_since(start_time.naive_utc())
>= resender.config.dead_timeout
{
StateChange::EndConnection
} else {
let dur = (*start_time + resender.config.dead_timeout)
.naive_utc()
.signed_duration_since(now_naive);
let next = Instant::now() + dur.to_std().unwrap();
self.state_timeout.reset(next);
if let futures::Async::Ready(()) =
self.state_timeout.poll()?
{
task::current().notify();
}
StateChange::Nothing
}
}
ResendStates::Disconnecting { ref start_time, .. } => {
if now_naive.signed_duration_since(start_time.naive_utc())
>= resender.config.disconnect_timeout
{
StateChange::EndConnection
} else {
let dur = (*start_time
+ resender.config.disconnect_timeout)
.naive_utc()
.signed_duration_since(now_naive);
let next = Instant::now() + dur.to_std().unwrap();
self.state_timeout.reset(next);
if let futures::Async::Ready(()) =
self.state_timeout.poll()?
{
task::current().notify();
}
StateChange::Nothing
}
}
}
};
if let StateChange::NewState(next_state) = next_state {
con.resender.set_state(next_state);
task::current().notify();
return Ok(futures::Async::NotReady);
} else if let StateChange::EndConnection = next_state {
let state = con.resender.state.get_name();
let data = match self.data.upgrade() {
Some(d) => d,
None => return Ok(futures::Async::Ready(())),
};
let mut data = data.lock();
info!(self.logger, "Exiting connection because it is not responding";
"current state" => state);
data.remove_connection(&self.connection_key);
return Ok(futures::Async::NotReady);
}
let mut switch_to_stalling = false;
let packet_interval =
con.resender.state.get_packet_interval(&con.resender.config);
let mut rto;
let mut last_threshold;
#[allow(clippy::let_and_return)]
while let Some(packet) = {
rto = if let Some(interval) = packet_interval {
interval
} else {
con.resender.srtt + con.resender.srtt_dev * 4
};
last_threshold = now - rto;
let packet = if let Some(rec) =
&mut con.resender.state.peek_mut_next_record()
{
if rec.tries != 0 && rec.last > last_threshold {
let dur = rec
.last
.naive_utc()
.signed_duration_since(last_threshold.naive_utc());
let next = Instant::now() + dur.to_std().unwrap();
self.timeout.reset(next);
if let futures::Async::Ready(()) = self.timeout.poll()? {
task::current().notify();
}
return Ok(futures::Async::NotReady);
}
Some(rec.packet.clone())
} else {
None
};
packet
} {
if let futures::AsyncSink::NotReady(_) =
self.sink.start_send((con.address, packet)).map_err(|e| {
format_err!(
"Failed to poll_complete udp packet sink ({:?})",
e
)
})? {
break;
} else {
if let futures::Async::Ready(()) =
self.sink.poll_complete().map_err(|e| {
format_err!(
"Failed to poll_complete udp packet sink ({:?})",
e
)
})? {
self.is_sending = false;
} else {
self.is_sending = true;
}
let is_normal_state;
let p_id;
{
is_normal_state = if let ResendStates::Normal { .. } =
con.resender.state
{
true
} else {
false
};
let mut rec =
con.resender.state.peek_mut_next_record().unwrap();
p_id = rec.id.1;
if rec.tries != 0
&& con.resender.srtt
< con.resender.config.normal_timeout
{
con.resender.srtt = con.resender.srtt * 2;
}
rec.last = now;
rec.tries += 1;
if rec.tries != 1 {
let to_s = if self.is_client { "S" } else { "C" };
warn!(self.logger, "Resend";
"p_type" => ?rec.id.0,
"p_id" => rec.id.1,
"tries" => rec.tries,
"last" => %rec.last,
"to" => to_s,
"srtt" => %con.resender.srtt,
"srtt_dev" => %con.resender.srtt_dev,
"rto" => %rto,
"threshold" => %last_threshold,
);
}
}
let next = now + rto;
let dur =
next.naive_utc().signed_duration_since(now.naive_utc());
if is_normal_state && dur > con.resender.config.normal_timeout {
warn!(self.logger, "Max resend timeout exceeded";
"p_id" => p_id, "dur" => %dur);
switch_to_stalling = true;
break;
}
}
}
if switch_to_stalling {
let mut to_send: Vec<_> =
if let ResendStates::Normal { to_send, .. } =
&mut con.resender.state
{
mem::replace(to_send, Default::default()).into_vec()
} else {
unreachable!("Connection was not in normal state");
};
to_send.sort_by(|a, b| a.id.1.cmp(&b.id.1));
con.resender.set_state(ResendStates::Stalling {
to_send,
start_time: now,
});
task::current().notify();
}
Ok(futures::Async::NotReady)
}
}