1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
use factory::DefaultFactory; use fibers::{BoxSpawn, Spawn}; use fibers_transport::{ FixedPeerTransporter, RcTransporter, TcpListener, TcpTransport, UdpTransport, UdpTransporter, }; use futures::{Async, Future, Poll, Stream}; use std::net::SocketAddr; use self::core::ServerCore; use auth::AuthParams; use transport::{ ChannelDataTcpTransporter, ChannelDataUdpTransporter, StunTcpTransporter, StunTransporter, StunUdpTransporter, }; use turn_message::{TurnMessageDecoder, TurnMessageEncoder}; use Error; mod core; #[derive(Debug)] #[must_use = "future do nothing unless polled"] pub struct UdpServer { core: ServerCore<StunUdpTransporter, ChannelDataUdpTransporter>, } impl UdpServer { pub fn start( bind_addr: SocketAddr, auth_params: AuthParams, ) -> impl Future<Item = Self, Error = Error> { UdpTransporter::bind(bind_addr) .map_err(|e| track!(Error::from(e))) .map(move |transporter| { let transporter = RcTransporter::new(transporter); let stun = StunUdpTransporter::new(StunTransporter::new(transporter.clone())); let channel_data = ChannelDataUdpTransporter::new(transporter); let core = ServerCore::new(stun, channel_data, auth_params); UdpServer { core } }) } pub fn local_addr(&self) -> SocketAddr { self.core .stun_transporter_ref() .inner_ref() .with_inner_ref(|x| x.local_addr()) } } impl Future for UdpServer { type Item = (); type Error = Error; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { track!(self.core.poll()) } } #[derive(Debug)] #[must_use = "future do nothing unless polled"] pub struct TcpServer { listener: TcpListener<DefaultFactory<TurnMessageEncoder>, DefaultFactory<TurnMessageDecoder>>, spawner: BoxSpawn, auth_params: AuthParams, } impl TcpServer { pub fn start<S>( spawner: S, bind_addr: SocketAddr, auth_params: AuthParams, ) -> impl Future<Item = Self, Error = Error> where S: Spawn + Send + 'static, { TcpListener::listen(bind_addr) .map_err(|e| track!(Error::from(e))) .map(move |listener| TcpServer { listener, spawner: spawner.boxed(), auth_params, }) } pub fn local_addr(&self) -> SocketAddr { self.listener.local_addr() } } impl Future for TcpServer { type Item = (); type Error = Error; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { while let Async::Ready(transporter) = track!(self.listener.poll())? { if let Some(transporter) = transporter { let peer = transporter.peer_addr(); let transporter = RcTransporter::new(transporter); let stun = StunTcpTransporter::new(StunTransporter::new(transporter.clone())); let stun = FixedPeerTransporter::new(peer, (), stun); let channel_data = ChannelDataTcpTransporter::new(transporter); let channel_data = FixedPeerTransporter::new(peer, (), channel_data); let auth_params = self.auth_params.clone(); self.spawner.spawn( ServerCore::new(stun, channel_data, auth_params).map_err(|e| panic!("{}", e)), ); } else { return Ok(Async::Ready(())); } } Ok(Async::NotReady) } }