#![deny(missing_docs)]
const PERM_REFRESH_INTERVAL: u64 = 180;
extern crate bytecodec;
#[macro_use]
extern crate stun_codec;
#[macro_use] extern crate trackable;
extern crate rand;
extern crate futures;
extern crate tokio;
use fnv::FnvHashSet;
use stun_codec::rfc8016::attributes::MobilityTicket;
use tokio::net as tokio_udp;
use tokio::time as tokio_timer;
use std::net::Ipv4Addr;
use std::net::SocketAddrV4;
use std::pin::Pin;
extern crate fnv;
#[macro_use]
extern crate slab_typesafe;
use stun_codec::{MessageDecoder, MessageEncoder};
use bytecodec::{DecodeExt, EncodeExt};
use std::net::SocketAddr;
use stun_codec::rfc5389::attributes::{
AlternateServer, ErrorCode, MessageIntegrity, Nonce, Realm, Software, Username,
XorMappedAddress,
};
use stun_codec::rfc5766::attributes::{
ChannelNumber, Data, Lifetime, RequestedTransport, XorPeerAddress, XorRelayAddress,
};
use self::attrs::Attribute;
use std::time::Duration;
use stun_codec::rfc5766::methods::{ALLOCATE, CHANNEL_BIND, CREATE_PERMISSION, REFRESH, SEND};
use stun_codec::{Message, MessageClass, TransactionId};
use tokio::time::Instant;
use futures::{task::Context, task::Poll, Future, Sink, Stream};
use fnv::FnvHashMap as HashMap;
mod attrs {
extern crate stun_codec;
use stun_codec::rfc5389::attributes::*;
use stun_codec::rfc5766::attributes::*;
use stun_codec::rfc8016::attributes::*;
define_attribute_enums!(
Attribute,
AttributeDecoder,
AttributeEncoder,
[
MappedAddress,
Username,
MessageIntegrity,
ErrorCode,
UnknownAttributes,
Realm,
Nonce,
XorMappedAddress,
Software,
AlternateServer,
Fingerprint,
ChannelNumber,
Lifetime,
XorPeerAddress,
Data,
XorRelayAddress,
EvenPort,
RequestedTransport,
DontFragment,
ReservationToken,
MobilityTicket
]
);
}
pub type Error = anyhow::Error;
use anyhow::anyhow;
use anyhow::bail;
use tokio_udp::UdpSocket;
use tokio_timer::{Interval, Sleep as Delay};
use slab_typesafe::Slab;
pub struct TurnClientBuilder {
pub turn_server: SocketAddr,
pub username: String,
pub password: String,
pub max_retries: usize,
pub retry_interval: Duration,
pub refresh_interval: Duration,
pub software: Option<&'static str>,
pub enable_mobility: bool,
}
impl TurnClientBuilder {
pub fn new(turn_server: SocketAddr, username: String, password: String) -> Self {
TurnClientBuilder {
turn_server,
username,
password,
max_retries: 10,
retry_interval: Duration::from_secs(1),
refresh_interval: Duration::from_secs(30),
software: Some("SimpleRustTurnClient"),
enable_mobility: false,
}
}
pub fn build_and_send_request(self, udp: UdpSocket) -> TurnClient {
let enable_mobility = self.enable_mobility;
let mut tc = TurnClient {
opts: self,
udp,
inflight: HashMap::with_capacity_and_hasher(2, Default::default()),
when_to_renew_the_allocation: None,
realm: None,
nonce: None,
permissions: Slab::with_capacity(1),
sockaddr2perm: HashMap::with_capacity_and_hasher(1, Default::default()),
permissions_pinger: None,
shutdown: false,
buffered_input_message: None,
mobility_ticket: if enable_mobility {
Some(MobilityTicket::empty())
} else {
None
},
mobility_refresh_in_progress: false,
};
tc.send_allocate_request(false, true).unwrap();
tc
}
pub fn restore_from_exported_parameters(
self,
udp: UdpSocket,
params: &ExportedParameters,
) -> anyhow::Result<TurnClient> {
let mut permissions : Slab<PermissionHandle, Permission> = Slab::with_capacity(params.permissions.len());
let mut sockaddr2perm =
HashMap::with_capacity_and_hasher(params.permissions.len(), Default::default());
let mut channelized_permissions: HashMap<usize, &SocketAddr> =
HashMap::with_capacity_and_hasher(params.permissions.len(), Default::default());
let mut unchannelized_permissions: Vec<SocketAddr> = Vec::new();
let mut max_channel_plus_one = 0usize;
let mut extra_slots_in_slab: FnvHashSet<PermissionHandle> =
FnvHashSet::with_hasher(Default::default());
for (sa, ch) in ¶ms.permissions {
if let Some(x) = ch {
channelized_permissions.insert(*x as usize, sa);
max_channel_plus_one = max_channel_plus_one.max(*x as usize + 1);
} else {
unchannelized_permissions.push(*sa);
}
}
let max_slots_in_slab = max_channel_plus_one.max(params.permissions.len());
let mut unchannelized = unchannelized_permissions.into_iter();
for i in 0..max_slots_in_slab {
let ve = permissions.vacant_entry();
assert_eq!(ve.key().0, i);
use anyhow::Context;
if let Some(sa) = channelized_permissions.get(&i) {
ve.insert(Permission {
addr: **sa,
channelized: true,
creation_already_reported: true,
});
sockaddr2perm.insert(**sa, PermissionToken::from_channel_number(i as u16).context("Internal errror?")?);
} else {
if let Some(sa) = unchannelized.next() {
sockaddr2perm.insert(sa, PermissionToken::from_handle(ve.key(), false));
ve.insert(Permission {
addr: sa,
channelized: false,
creation_already_reported: true,
});
} else {
extra_slots_in_slab.insert(ve.key());
ve.insert(Permission {
addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
channelized: false,
creation_already_reported: false,
});
}
}
for extra_slot in extra_slots_in_slab.iter() {
permissions.remove(*extra_slot);
}
}
Ok(TurnClient {
opts: self,
udp,
inflight: HashMap::default(),
when_to_renew_the_allocation: Some(Box::pin(tokio::time::sleep_until(
tokio::time::Instant::now(),
))),
realm: if params.realm.is_empty() {
None
} else {
Some(Realm::new(params.realm.clone())?)
},
nonce: if params.nonce.is_empty() {
None
} else {
Some(Nonce::new(params.nonce.clone())?)
},
permissions,
sockaddr2perm,
permissions_pinger: if params.permissions.is_empty() {
None
} else {
Some(tokio_timer::interval(Duration::from_secs(
PERM_REFRESH_INTERVAL,
)))
},
shutdown: false,
buffered_input_message: None,
mobility_ticket: if params.mobility_ticket.is_empty() {
None
} else {
Some(MobilityTicket::new(params.mobility_ticket.clone())?)
},
mobility_refresh_in_progress: false,
})
}
}
#[derive(Debug)]
pub enum MessageFromTurnServer {
AllocationGranted {
relay_address: SocketAddr,
mapped_address: SocketAddr,
server_software: Option<String>,
mobility: bool,
},
RedirectedToAlternateServer(SocketAddr),
PermissionCreated(SocketAddr),
PermissionNotCreated(SocketAddr),
RecvFrom(SocketAddr, Vec<u8>),
Disconnected,
APacketIsReceivedAndAutomaticallyHandled,
ForeignPacket(SocketAddr, Vec<u8>),
NetworkChange,
}
enum InflightRequestStatus {
SendNow,
RetryLater(Pin<Box<Delay>>),
TimedOut,
}
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Hash, Clone, Copy)]
pub enum ChannelUsage {
WithChannel,
JustPermission,
}
#[derive(Debug)]
pub enum MessageToTurnServer {
Noop,
AddPermission(SocketAddr, ChannelUsage),
SendTo(SocketAddr, Vec<u8>),
Disconnect,
ForceRefreshWithMobility,
}
struct CompletionHooks {
success: Box<dyn FnMut(&mut TurnClient) -> Result<Option<MessageFromTurnServer>, Error> + Send>,
failure: Box<dyn FnMut(&mut TurnClient) -> Result<Option<MessageFromTurnServer>, Error> + Send>,
}
struct InflightRequest {
status: InflightRequestStatus,
data: Vec<u8>,
retryctr: usize,
completion_hook: Option<CompletionHooks>,
}
declare_slab_token!(PermissionHandle);
#[derive(Clone, Copy)]
struct PermissionToken(u32);
impl PermissionToken {
pub fn as_channel_number(&self) -> Option<u16> {
if self.0 <= 0x3FFE {
Some(0x4000 + (self.0 as u16))
} else {
None
}
}
pub fn from_channel_number(n: u16) -> Option<Self> {
if n >= 0x4000 && n <= 0x7FFE {
Some(PermissionToken((n as u32) - 0x4000).into())
} else {
None
}
}
pub fn as_handle(&self) -> PermissionHandle {
PermissionHandle((self.0 & 0x7FFF_FFFF) as usize)
}
pub fn from_handle(ph: PermissionHandle, channelized: bool) -> PermissionToken {
assert!(ph.0 < 0x80000000);
let mut x = ph.0 as u32;
if !channelized {
x |= 0x8000_0000;
}
PermissionToken(x)
}
}
struct Permission {
addr: SocketAddr,
channelized: bool,
creation_already_reported: bool,
}
#[derive(Debug, PartialEq, Eq)]
pub struct ExportedParameters {
pub realm: String,
pub nonce: String,
pub mobility_ticket: Vec<u8>,
pub permissions: Vec<(SocketAddr, Option<u16>)>,
}
pub struct TurnClient {
opts: TurnClientBuilder,
udp: UdpSocket,
inflight: HashMap<TransactionId, InflightRequest>,
when_to_renew_the_allocation: Option<Pin<Box<Delay>>>,
realm: Option<Realm>,
nonce: Option<Nonce>,
permissions: Slab<PermissionHandle, Permission>,
sockaddr2perm: HashMap<SocketAddr, PermissionToken>,
permissions_pinger: Option<Interval>,
shutdown: bool,
buffered_input_message: Option<MessageToTurnServer>,
mobility_ticket: Option<MobilityTicket>,
mobility_refresh_in_progress: bool,
}
impl TurnClient {
pub fn into_udp_socket(self) -> UdpSocket {
self.udp
}
pub fn export_state(&self) -> ExportedParameters {
ExportedParameters {
realm: self
.realm
.as_ref()
.map(|x| x.text())
.unwrap_or_default()
.to_owned(),
nonce: self
.nonce
.as_ref()
.map(|x| x.value())
.unwrap_or_default()
.to_owned(),
mobility_ticket: self
.mobility_ticket
.as_ref()
.map(|x| x.data().to_owned())
.unwrap_or_default(),
permissions: self
.permissions
.iter()
.map(|p| {
(
p.1.addr,
if p.1.channelized {
match self.sockaddr2perm.get(&p.1.addr) {
Some(pt) => pt.as_channel_number(),
None => None,
}
} else {
None
},
)
})
.collect(),
}
}
}
fn gen_transaction_id() -> TransactionId {
use rand::Rng;
let random_bytes = rand::thread_rng().gen::<[u8; 12]>();
TransactionId::new(random_bytes)
}
impl TurnClient {
fn send_allocate_request(
&mut self,
shutdown: bool,
enable_mobility: bool,
) -> Result<(), Error> {
let transid = gen_transaction_id();
let method = if self.when_to_renew_the_allocation.is_none() {
if self.mobility_refresh_in_progress {
REFRESH
} else {
ALLOCATE
}
} else {
REFRESH
};
let mut message: Message<Attribute> = Message::new(MessageClass::Request, method, transid);
if let Some(s) = self.opts.software {
message.add_attribute(Attribute::Software(Software::new(s.to_owned())?));
}
if method == ALLOCATE {
message.add_attribute(Attribute::RequestedTransport(RequestedTransport::new(
17,
)));
}
if enable_mobility {
if let Some(mt) = &self.mobility_ticket {
message.add_attribute(Attribute::MobilityTicket(mt.clone()));
}
}
if shutdown {
message.add_attribute(Attribute::Lifetime(Lifetime::new(Duration::from_secs(0))?));
}
self.sign_request(&mut message)?;
self.file_request(transid, message, None)?;
Ok(())
}
fn process_alloc_lifetime(&self, mut lt: Duration) -> Duration {
if lt < Duration::from_secs(90) {
lt = Duration::from_secs(5);
} else {
lt = lt - Duration::from_secs(60);
}
if lt > self.opts.refresh_interval {
lt = self.opts.refresh_interval;
}
lt
}
fn sign_request(&self, message: &mut Message<Attribute>) -> Result<(), Error> {
let username = Username::new(self.opts.username.clone())?;
message.add_attribute(Attribute::Username(username.clone()));
if let (Some(re), Some(no)) = (self.realm.clone(), self.nonce.clone()) {
message.add_attribute(Attribute::Realm(re.clone()));
message.add_attribute(Attribute::Nonce(no));
message.add_attribute(Attribute::MessageIntegrity(
MessageIntegrity::new_long_term_credential(
&message,
&username,
&re,
self.opts.password.as_str(),
)?,
));
}
Ok(())
}
fn file_request(
&mut self,
transid: TransactionId,
message: Message<Attribute>,
completion_hook: Option<CompletionHooks>,
) -> Result<(), Error> {
if self.shutdown {
return Ok(());
}
let mut encoder = MessageEncoder::new();
let bytes = encoder.encode_into_bytes(message)?;
let rq = InflightRequest {
status: InflightRequestStatus::SendNow,
data: bytes,
retryctr: 0,
completion_hook,
};
self.inflight.insert(transid, rq);
Ok(())
}
fn send_perm_request(&mut self, h: PermissionHandle) -> Result<(), Error> {
let p = &self.permissions[h];
let transid = gen_transaction_id();
let method = if p.channelized {
CHANNEL_BIND
} else {
CREATE_PERMISSION
};
let mut message: Message<Attribute> = Message::new(MessageClass::Request, method, transid);
if let Some(s) = self.opts.software {
message.add_attribute(Attribute::Software(Software::new(s.to_owned())?));
}
message.add_attribute(Attribute::XorPeerAddress(XorPeerAddress::new(p.addr)));
if p.channelized {
let chn = PermissionToken::from_handle(h, true)
.as_channel_number()
.ok_or(anyhow!("Channel number overflow"))?;
message.add_attribute(Attribute::ChannelNumber(ChannelNumber::new(chn)?));
}
let hooks = CompletionHooks {
success: Box::new(move |_self| {
let p = &mut _self.permissions[h];
let msg = if p.creation_already_reported {
MessageFromTurnServer::APacketIsReceivedAndAutomaticallyHandled
} else {
p.creation_already_reported = true;
MessageFromTurnServer::PermissionCreated(p.addr)
};
Ok(Some(msg))
}),
failure: Box::new(move |_self| {
let p = &mut _self.permissions[h];
let msg = if p.creation_already_reported {
MessageFromTurnServer::APacketIsReceivedAndAutomaticallyHandled
} else {
p.creation_already_reported = true;
MessageFromTurnServer::PermissionNotCreated(p.addr)
};
Ok(Some(msg))
}),
};
self.sign_request(&mut message)?;
self.file_request(transid, message, Some(hooks))?;
Ok(())
}
fn send_data_indication(
&mut self,
cx: &mut Context,
sa: SocketAddr,
data: &[u8],
) -> Poll<Result<(), Error>> {
if let Some(p) = self.sockaddr2perm.get(&sa) {
if let Some(cn) = p.as_channel_number() {
let mut b = Vec::with_capacity(data.len() + 4);
let l = data.len();
b.push(((cn & 0xFF00) >> 8) as u8);
b.push(((cn & 0x00FF) >> 0) as u8);
b.push(((l & 0xFF00) >> 8) as u8);
b.push(((l & 0x00FF) >> 0) as u8);
b.extend_from_slice(&data[..]);
match self.udp.poll_send_to(cx, &b[..], self.opts.turn_server) {
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(len)) => {
if len == l + 4 {
return Poll::Ready(Ok(()));
} else {
return Poll::Ready(Err(anyhow!(
"Invalid length of a sent UDP datagram"
)));
}
}
}
#[allow(unreachable_code)]
{
unreachable!()
}
}
}
let transid = gen_transaction_id();
let method = SEND;
let mut message: Message<Attribute> =
Message::new(MessageClass::Indication, method, transid);
message.add_attribute(Attribute::XorPeerAddress(XorPeerAddress::new(sa)));
message.add_attribute(Attribute::Data(Data::new(data.to_vec())?));
let mut encoder = MessageEncoder::new();
let bytes = encoder.encode_into_bytes(message)?;
match self.udp.poll_send_to(cx, &bytes[..], self.opts.turn_server) {
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)?),
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(len)) => {
if len == bytes.len() {
return Poll::Ready(Ok(()));
} else {
return Poll::Ready(Err(anyhow!("Invalid length of a sent UDP datagram")));
}
}
}
}
fn handle_incoming_packet(&mut self, buf: &[u8]) -> Result<MessageFromTurnServer, Error> {
use self::MessageFromTurnServer::*;
use stun_codec::MessageClass::{ErrorResponse, Indication, Request, SuccessResponse};
let mut foreign_packet = false;
if buf.len() < 4 {
foreign_packet = true;
} else {
if buf[0] >= 0x40 && buf[0] <= 0x7F {
let chnum = (buf[0] as u16) << 8 | (buf[1] as u16);
let len = (buf[2] as u16) << 8 | (buf[3] as u16);
let h = PermissionToken::from_channel_number(chnum);
if h.is_none() || buf.len() < (len as usize) + 4 {
foreign_packet = true;
} else {
if let Some(p) = self.permissions.get(h.unwrap().as_handle()) {
return Ok(MessageFromTurnServer::RecvFrom(p.addr, buf[4..].to_vec()));
} else {
foreign_packet = true;
}
}
}
}
if buf.len() < 18 {
foreign_packet = true;
}
if foreign_packet {
return Ok(ForeignPacket(self.opts.turn_server, buf.to_vec()));
}
let mut decoder = MessageDecoder::<Attribute>::new();
let decoded = decoder
.decode_from_bytes(buf)?
.map_err(|_| anyhow!("Broken TURN reply"))?;
let tid = decoded.transaction_id();
if decoded.class() == Indication {
let pa = decoded
.get_attribute::<XorPeerAddress>()
.ok_or(anyhow!("No XorPeerAddress in data indication"))?;
let data = decoded
.get_attribute::<Data>()
.ok_or(anyhow!("No Data attribute in indication"))?;
return Ok(MessageFromTurnServer::RecvFrom(
pa.address(),
data.data().to_vec(),
));
}
if self.inflight.get(&tid).is_none() {
return Ok(ForeignPacket(self.opts.turn_server, buf.to_vec()));
} else {
let rm = self.inflight.remove(&tid);
if let Some(mut h) = rm.unwrap().completion_hook {
match decoded.class() {
Request => bail!("Server replied to our request with another request?"),
Indication => bail!("Server replied to our request with an indication?"),
SuccessResponse => {
if let Some(ret) = (*h.success)(self)? {
return Ok(ret);
}
}
ErrorResponse => {
if let Some(ret) = (*h.failure)(self)? {
return Ok(ret);
}
}
}
}
}
if self.when_to_renew_the_allocation.is_none() {
match decoded.class() {
SuccessResponse => {
let ra = decoded
.get_attribute::<XorRelayAddress>()
.ok_or(anyhow!("No XorRelayAddress in reply"))?;
let ma = decoded
.get_attribute::<XorMappedAddress>()
.ok_or(anyhow!("No XorMappedAddress in reply"))?;
let sw = decoded
.get_attribute::<Software>()
.as_ref()
.map(|x| x.description());
let lt = decoded
.get_attribute::<Lifetime>()
.ok_or(anyhow!("No Lifetime in reply"))?;
if let Some(_mt) = self.mobility_ticket.take() {
if let Some(mt_reply) = decoded.get_attribute::<MobilityTicket>() {
self.mobility_ticket = Some(mt_reply.clone());
} else {
}
}
let lt = self.process_alloc_lifetime(lt.lifetime());
self.when_to_renew_the_allocation = Some(Box::pin(tokio::time::sleep_until(
tokio::time::Instant::now() + lt,
)));
let ret = AllocationGranted {
relay_address: ra.address(),
mapped_address: ma.address(),
server_software: sw.map(|x| x.to_owned()),
mobility: self.mobility_ticket.is_some(),
};
return Ok(ret);
}
ErrorResponse => {
let ec = decoded
.get_attribute::<ErrorCode>()
.ok_or(anyhow!("ErrorResponse without ErrorCode?"))?
.code();
match ec {
401 => {
if self.nonce.is_some() {
bail!("Authentication failed");
}
let re = decoded
.get_attribute::<Realm>()
.ok_or(anyhow!("Missing Realm in NotAuthorized response"))?;
let no = decoded
.get_attribute::<Nonce>()
.ok_or(anyhow!("Missing Nonce in NotAuthorized response"))?;
self.realm = Some(re.clone());
self.nonce = Some(no.clone());
self.send_allocate_request(false, true)?;
}
300 => {
let ta = decoded
.get_attribute::<AlternateServer>()
.ok_or(anyhow!("Redirect without AlternateServer"))?;
return Ok(RedirectedToAlternateServer(ta.address()));
}
_ => {
Err(anyhow!("Unknown error code from TURN: {}", ec))?;
}
}
}
Indication => {
bail!("Indication when not allocated anything")
}
Request => {
bail!("Received a Request instead of Response from server")
}
}
} else {
match decoded.class() {
SuccessResponse => match decoded.method() {
REFRESH => {
let lt = decoded
.get_attribute::<Lifetime>()
.ok_or(anyhow!("No Lifetime in reply"))?;
if lt.lifetime() == Duration::from_secs(0) {
self.when_to_renew_the_allocation = None;
self.shutdown = true;
return Ok(MessageFromTurnServer::Disconnected);
}
let lt = self.process_alloc_lifetime(lt.lifetime());
if let Some(_mt) = self.mobility_ticket.take() {
if let Some(mt_reply) = decoded.get_attribute::<MobilityTicket>() {
self.mobility_ticket = Some(mt_reply.clone());
}
}
self.when_to_renew_the_allocation =
Some(Box::pin(tokio_timer::sleep_until(Instant::now() + lt)));
if self.mobility_refresh_in_progress {
self.mobility_refresh_in_progress = false;
return Ok(MessageFromTurnServer::NetworkChange);
}
}
CREATE_PERMISSION => {
bail!("Reached unreachable code: CREATE_PERMISSION should be handled elsewhere")
}
x => {
bail!("Not implemented: success response for {:?}", x)
}
},
ErrorResponse => {
let ec = decoded
.get_attribute::<ErrorCode>()
.ok_or(anyhow!("ErrorResponse without ErrorCode?"))?
.code();
if ec == 438 && !self.mobility_refresh_in_progress {
let new_nonce = decoded
.get_attribute::<Nonce>()
.ok_or(anyhow!("Stale Nonce error without new Nonce?"))?;
self.nonce = Some(new_nonce.clone());
self.send_allocate_request(self.shutdown, false)?;
return Ok(MessageFromTurnServer::APacketIsReceivedAndAutomaticallyHandled);
}
if decoded.method() == REFRESH
&& ec == 437
&& !self.mobility_refresh_in_progress
{
if self.mobility_ticket.is_some() {
self.mobility_refresh_in_progress = true;
self.send_allocate_request(false, true)?;
return Ok(
MessageFromTurnServer::APacketIsReceivedAndAutomaticallyHandled,
);
}
}
bail!("Error from TURN: {}", ec);
}
Indication => {
bail!("Not implemented: handling indications");
}
Request => {
bail!("Received a Request instead of Response from server");
}
}
}
Ok(MessageFromTurnServer::APacketIsReceivedAndAutomaticallyHandled)
}
}
impl Stream for TurnClient {
type Item = Result<MessageFromTurnServer, Error>;
fn poll_next(
self: Pin<&mut TurnClient>,
cx: &mut Context,
) -> Poll<Option<Result<MessageFromTurnServer, Error>>> {
let this: &mut TurnClient = Pin::into_inner(self);
'main: loop {
if this.shutdown {
return Poll::Ready(None);
}
let mut buf = [0; 1560];
let mut buf = tokio::io::ReadBuf::new(&mut buf[..]);
match this.udp.poll_recv_from(cx, &mut buf) {
Poll::Pending => (),
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Ok(addr)) => {
let buf = buf.filled();
if addr != this.opts.turn_server {
return Poll::Ready(Some(Ok(MessageFromTurnServer::ForeignPacket(
addr,
buf.to_vec(),
))));
}
let ret = this.handle_incoming_packet(buf)?;
return Poll::Ready(Some(Ok(ret)));
}
}
let mut remove_this_stale_rqs = vec![];
for (k, rq) in &mut this.inflight {
match &mut rq.status {
InflightRequestStatus::TimedOut => {
remove_this_stale_rqs.push(*k);
}
InflightRequestStatus::SendNow => {
match this
.udp
.poll_send_to(cx, &rq.data[..], this.opts.turn_server)
{
Poll::Ready(Err(e)) => Err(e)?,
Poll::Pending => (),
Poll::Ready(Ok(len)) => {
assert_eq!(len, rq.data.len());
let d = tokio_timer::sleep_until(
Instant::now() + this.opts.retry_interval,
);
rq.status = InflightRequestStatus::RetryLater(Box::pin(d));
continue 'main;
}
}
}
InflightRequestStatus::RetryLater(ref mut d) => match d.as_mut().poll(cx) {
Poll::Pending => (),
Poll::Ready(()) => {
rq.retryctr += 1;
if rq.retryctr >= this.opts.max_retries {
rq.status = InflightRequestStatus::TimedOut;
Err(anyhow!("Request timed out"))?;
} else {
rq.status = InflightRequestStatus::SendNow;
continue 'main;
}
}
},
}
}
for rm in remove_this_stale_rqs {
this.inflight.remove(&rm);
}
if let Some(x) = &mut this.when_to_renew_the_allocation {
match x.as_mut().poll(cx) {
Poll::Pending => (),
Poll::Ready(()) => {
let ri = this.opts.refresh_interval;
x.as_mut().reset(Instant::now() + ri);
this.send_allocate_request(false, false)?;
continue 'main;
}
}
}
let mut ids_to_refresh = vec![];
if let Some(pp) = &mut this.permissions_pinger {
match pp.poll_tick(cx) {
Poll::Pending => (),
Poll::Ready(_instant) => {
for (h, _) in this.permissions.iter() {
ids_to_refresh.push(h);
}
}
}
}
if !ids_to_refresh.is_empty() {
for h in ids_to_refresh {
this.send_perm_request(h)?;
}
continue 'main;
}
return Poll::Pending; } }
}
impl Sink<MessageToTurnServer> for TurnClient {
type Error = Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
fn start_send(mut self: Pin<&mut Self>, msg: MessageToTurnServer) -> Result<(), Error> {
if self.shutdown {
return Err(anyhow!("TURN client received a shutdown request"))?;
}
if self.buffered_input_message.is_some() {
panic!("<TurnClient as Sink>::start_send called without prior poll_ready");
}
self.buffered_input_message = Some(msg);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this: &mut TurnClient = Pin::into_inner(self);
'main: loop {
'requests: for (_, rq) in &mut this.inflight {
match &mut rq.status {
InflightRequestStatus::TimedOut => {
}
InflightRequestStatus::SendNow => {
match this
.udp
.poll_send_to(cx, &rq.data[..], this.opts.turn_server)
{
Poll::Ready(Err(e)) => Err(e)?,
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(len)) => {
assert_eq!(len, rq.data.len());
let d = tokio_timer::sleep_until(
Instant::now() + this.opts.retry_interval,
);
rq.status = InflightRequestStatus::RetryLater(Box::pin(d));
continue 'requests;
}
}
}
InflightRequestStatus::RetryLater(ref mut _d) => {
}
}
}
let msg = match this.buffered_input_message.take() {
Some(x) => x,
None => return Poll::Ready(Ok(())),
};
use self::MessageToTurnServer::*;
match msg {
Noop => (),
AddPermission(sa, chusage) => {
if this.permissions_pinger.is_none() {
this.permissions_pinger = Some(tokio_timer::interval(Duration::from_secs(
PERM_REFRESH_INTERVAL,
)));
}
let channelized = chusage == ChannelUsage::WithChannel;
if channelized {
if this.permissions.len() >= 0x3FFE {
Err(anyhow!(
"There are too many permissions/channels to open another channel"
))?
}
}
let p = Permission {
addr: sa,
channelized,
creation_already_reported: false,
};
let id = this.permissions.insert(p);
this.sockaddr2perm
.insert(sa, PermissionToken::from_handle(id, channelized));
this.send_perm_request(id)?;
continue 'main;
}
SendTo(sa, ref data) => match this.send_data_indication(cx, sa, &data[..]) {
Poll::Ready(Ok(())) => (),
Poll::Ready(Err(e)) => {
this.buffered_input_message = Some(msg);
return Poll::Ready(Err(e));
}
Poll::Pending => return Poll::Pending,
},
Disconnect => {
this.send_allocate_request(true, false)?;
continue 'main;
}
ForceRefreshWithMobility => {
this.send_allocate_request(false, true)?;
continue 'main;
}
}
return Poll::Ready(Ok(()));
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
}
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}