1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
use factory::DefaultFactory;
use fibers::{BoxSpawn, Spawn};
use fibers_transport::{
    FixedPeerTransporter, RcTransporter, TcpListener, TcpTransport, UdpTransport, UdpTransporter,
};
use futures::{Async, Future, Poll, Stream};
use std::net::SocketAddr;

use self::core::ServerCore;
use auth::AuthParams;
use transport::{
    ChannelDataTcpTransporter, ChannelDataUdpTransporter, StunTcpTransporter, StunTransporter,
    StunUdpTransporter,
};
use turn_message::{TurnMessageDecoder, TurnMessageEncoder};
use Error;

mod core;

#[derive(Debug)]
#[must_use = "future do nothing unless polled"]
pub struct UdpServer {
    core: ServerCore<StunUdpTransporter, ChannelDataUdpTransporter>,
}
impl UdpServer {
    pub fn start(
        bind_addr: SocketAddr,
        auth_params: AuthParams,
    ) -> impl Future<Item = Self, Error = Error> {
        UdpTransporter::bind(bind_addr)
            .map_err(|e| track!(Error::from(e)))
            .map(move |transporter| {
                let transporter = RcTransporter::new(transporter);
                let stun = StunUdpTransporter::new(StunTransporter::new(transporter.clone()));
                let channel_data = ChannelDataUdpTransporter::new(transporter);
                let core = ServerCore::new(stun, channel_data, auth_params);
                UdpServer { core }
            })
    }

    pub fn local_addr(&self) -> SocketAddr {
        self.core
            .stun_transporter_ref()
            .inner_ref()
            .with_inner_ref(|x| x.local_addr())
    }
}
impl Future for UdpServer {
    type Item = ();
    type Error = Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        track!(self.core.poll())
    }
}

#[derive(Debug)]
#[must_use = "future do nothing unless polled"]
pub struct TcpServer {
    listener: TcpListener<DefaultFactory<TurnMessageEncoder>, DefaultFactory<TurnMessageDecoder>>,
    spawner: BoxSpawn,
    auth_params: AuthParams,
}
impl TcpServer {
    pub fn start<S>(
        spawner: S,
        bind_addr: SocketAddr,
        auth_params: AuthParams,
    ) -> impl Future<Item = Self, Error = Error>
    where
        S: Spawn + Send + 'static,
    {
        TcpListener::listen(bind_addr)
            .map_err(|e| track!(Error::from(e)))
            .map(move |listener| TcpServer {
                listener,
                spawner: spawner.boxed(),
                auth_params,
            })
    }

    pub fn local_addr(&self) -> SocketAddr {
        self.listener.local_addr()
    }
}
impl Future for TcpServer {
    type Item = ();
    type Error = Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        while let Async::Ready(transporter) = track!(self.listener.poll())? {
            if let Some(transporter) = transporter {
                let peer = transporter.peer_addr();
                let transporter = RcTransporter::new(transporter);
                let stun = StunTcpTransporter::new(StunTransporter::new(transporter.clone()));
                let stun = FixedPeerTransporter::new(peer, (), stun);
                let channel_data = ChannelDataTcpTransporter::new(transporter);
                let channel_data = FixedPeerTransporter::new(peer, (), channel_data);
                let auth_params = self.auth_params.clone();
                self.spawner.spawn(
                    ServerCore::new(stun, channel_data, auth_params).map_err(|e| panic!("{}", e)),
                );
            } else {
                return Ok(Async::Ready(()));
            }
        }
        Ok(Async::NotReady)
    }
}