turn_server/server/
mod.rs

1pub mod transport;
2
3use std::{net::SocketAddr, sync::Arc};
4
5use ahash::{HashMap, HashMapExt};
6use anyhow::Result;
7use bytes::Bytes;
8use parking_lot::RwLock;
9
10use tokio::{
11    sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
12    task::JoinSet,
13};
14
15use crate::{
16    Service,
17    codec::message::methods::Method,
18    config::{Config, Interface},
19    server::transport::{Server, ServerOptions, Transport, tcp::TcpServer, udp::UdpServer},
20    statistics::Statistics,
21};
22
23pub async fn start_server(config: Config, service: Service, statistics: Statistics) -> Result<()> {
24    let exchanger = Exchanger::default();
25
26    let mut servers = JoinSet::new();
27
28    for interface in config.server.interfaces {
29        match interface {
30            Interface::Udp {
31                listen,
32                external,
33                idle_timeout,
34                mtu,
35            } => {
36                servers.spawn(UdpServer::start(
37                    ServerOptions {
38                        transport: Transport::Udp,
39                        idle_timeout,
40                        ssl: None,
41                        external,
42                        listen,
43                        mtu,
44                    },
45                    service.clone(),
46                    statistics.clone(),
47                    exchanger.clone(),
48                ));
49            }
50            Interface::Tcp {
51                listen,
52                external,
53                idle_timeout,
54                ssl,
55            } => {
56                servers.spawn(TcpServer::start(
57                    ServerOptions {
58                        transport: Transport::Tcp,
59                        idle_timeout,
60                        external,
61                        listen,
62                        mtu: 0,
63                        ssl,
64                    },
65                    service.clone(),
66                    statistics.clone(),
67                    exchanger.clone(),
68                ));
69            }
70        };
71    }
72
73    // As soon as one server exits, all servers will be exited to ensure the
74    // availability of all servers.
75    if let Some(res) = servers.join_next().await {
76        servers.abort_all();
77
78        return res?;
79    }
80
81    Ok(())
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85enum PayloadType {
86    Message(Method),
87    ChannelData,
88}
89
90/// Handles packet forwarding between transport protocols.
91#[derive(Clone)]
92pub struct Exchanger(Arc<RwLock<HashMap<SocketAddr, UnboundedSender<(Bytes, PayloadType)>>>>);
93
94impl Default for Exchanger {
95    fn default() -> Self {
96        Self(Arc::new(RwLock::new(HashMap::with_capacity(1024))))
97    }
98}
99
100impl Exchanger {
101    /// Get the socket reader for the route.
102    ///
103    /// Each transport protocol is layered according to its own socket, and
104    /// the data forwarded to this socket can be obtained by routing.
105    fn get_receiver(&self, interface: SocketAddr) -> UnboundedReceiver<(Bytes, PayloadType)> {
106        let (sender, receiver) = unbounded_channel();
107        self.0.write().insert(interface, sender);
108
109        receiver
110    }
111
112    /// Send data to dispatcher.
113    ///
114    /// By specifying the socket identifier and destination address, the route
115    /// is forwarded to the corresponding socket. However, it should be noted
116    /// that calling this function will not notify whether the socket exists.
117    /// If it does not exist, the data will be discarded by default.
118    fn send(&self, interface: &SocketAddr, ty: PayloadType, data: Bytes) {
119        let mut is_destroy = false;
120
121        {
122            if let Some(sender) = self.0.read().get(interface)
123                && sender.send((data, ty)).is_err()
124            {
125                is_destroy = true;
126            }
127        }
128
129        if is_destroy {
130            self.remove(interface);
131        }
132    }
133
134    /// delete socket.
135    pub fn remove(&self, interface: &SocketAddr) {
136        drop(self.0.write().remove(interface))
137    }
138}