Skip to main content

turn_server/server/
mod.rs

1pub mod transport;
2
3use std::{net::SocketAddr, sync::Arc};
4
5use ahash::{HashMap, HashMapExt};
6use anyhow::Result;
7use bytes::Bytes;
8use parking_lot::RwLock;
9
10use tokio::{
11    sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
12    task::JoinSet,
13};
14
15use crate::{
16    Service,
17    codec::message::methods::Method,
18    config::{Config, Interface},
19    server::transport::{Server, ServerOptions, Transport, tcp::TcpServer, udp::UdpServer},
20    statistics::Statistics,
21};
22
23pub async fn start_server(config: Config, service: Service, statistics: Statistics) -> Result<()> {
24    let exchanger = Exchanger::default();
25
26    let mut servers = JoinSet::new();
27
28    for interface in config.server.interfaces {
29        match interface {
30            Interface::Udp {
31                listen,
32                external,
33                idle_timeout,
34                mtu,
35            } => {
36                servers.spawn(UdpServer::start(
37                    ServerOptions {
38                        transport: Transport::Udp,
39                        idle_timeout,
40                        ssl: None,
41                        external,
42                        listen,
43                        mtu,
44                    },
45                    service.clone(),
46                    statistics.clone(),
47                    exchanger.clone(),
48                ));
49            }
50            Interface::Tcp {
51                listen,
52                external,
53                idle_timeout,
54                ssl,
55            } => {
56                servers.spawn(TcpServer::start(
57                    ServerOptions {
58                        transport: Transport::Tcp,
59                        idle_timeout,
60                        external,
61                        listen,
62                        mtu: 0,
63                        ssl,
64                    },
65                    service.clone(),
66                    statistics.clone(),
67                    exchanger.clone(),
68                ));
69            }
70        };
71    }
72
73    // As soon as one server exits, all servers will be exited to ensure the
74    // availability of all servers.
75    if let Some(res) = servers.join_next().await {
76        servers.abort_all();
77
78        return res?;
79    }
80
81    Ok(())
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85enum PayloadType {
86    Message(Method),
87    ChannelData,
88}
89
90type ExchangerSender = UnboundedSender<(Bytes, PayloadType)>;
91
92/// Handles packet forwarding between transport protocols.
93#[derive(Clone)]
94pub struct Exchanger(Arc<RwLock<HashMap<SocketAddr, ExchangerSender>>>);
95
96impl Default for Exchanger {
97    fn default() -> Self {
98        Self(Arc::new(RwLock::new(HashMap::with_capacity(1024))))
99    }
100}
101
102impl Exchanger {
103    /// Get the socket reader for the route.
104    ///
105    /// Each transport protocol is layered according to its own socket, and
106    /// the data forwarded to this socket can be obtained by routing.
107    fn get_receiver(&self, interface: SocketAddr) -> UnboundedReceiver<(Bytes, PayloadType)> {
108        let (sender, receiver) = unbounded_channel();
109        self.0.write().insert(interface, sender);
110
111        receiver
112    }
113
114    /// Send data to dispatcher.
115    ///
116    /// By specifying the socket identifier and destination address, the route
117    /// is forwarded to the corresponding socket. However, it should be noted
118    /// that calling this function will not notify whether the socket exists.
119    /// If it does not exist, the data will be discarded by default.
120    fn send(&self, interface: &SocketAddr, ty: PayloadType, data: Bytes) {
121        let mut is_destroy = false;
122
123        {
124            if let Some(sender) = self.0.read().get(interface)
125                && sender.send((data, ty)).is_err()
126            {
127                is_destroy = true;
128            }
129        }
130
131        if is_destroy {
132            self.remove(interface);
133        }
134    }
135
136    /// delete socket.
137    pub fn remove(&self, interface: &SocketAddr) {
138        drop(self.0.write().remove(interface))
139    }
140}