turn_server/server/
mod.rs1pub mod transport;
2
3use std::{net::SocketAddr, sync::Arc};
4
5use ahash::{HashMap, HashMapExt};
6use anyhow::Result;
7use bytes::Bytes;
8use parking_lot::RwLock;
9
10use tokio::{
11 sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
12 task::JoinSet,
13};
14
15use crate::{
16 Service,
17 codec::message::methods::Method,
18 config::{Config, Interface},
19 server::transport::{Server, ServerOptions, Transport, tcp::TcpServer, udp::UdpServer},
20 statistics::Statistics,
21};
22
23pub async fn start_server(config: Config, service: Service, statistics: Statistics) -> Result<()> {
24 let exchanger = Exchanger::default();
25
26 let mut servers = JoinSet::new();
27
28 for interface in config.server.interfaces {
29 match interface {
30 Interface::Udp {
31 listen,
32 external,
33 idle_timeout,
34 mtu,
35 } => {
36 servers.spawn(UdpServer::start(
37 ServerOptions {
38 transport: Transport::Udp,
39 idle_timeout,
40 ssl: None,
41 external,
42 listen,
43 mtu,
44 },
45 service.clone(),
46 statistics.clone(),
47 exchanger.clone(),
48 ));
49 }
50 Interface::Tcp {
51 listen,
52 external,
53 idle_timeout,
54 ssl,
55 } => {
56 servers.spawn(TcpServer::start(
57 ServerOptions {
58 transport: Transport::Tcp,
59 idle_timeout,
60 external,
61 listen,
62 mtu: 0,
63 ssl,
64 },
65 service.clone(),
66 statistics.clone(),
67 exchanger.clone(),
68 ));
69 }
70 };
71 }
72
73 if let Some(res) = servers.join_next().await {
76 servers.abort_all();
77
78 return res?;
79 }
80
81 Ok(())
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85enum PayloadType {
86 Message(Method),
87 ChannelData,
88}
89
90type ExchangerSender = UnboundedSender<(Bytes, PayloadType)>;
91
92#[derive(Clone)]
94pub struct Exchanger(Arc<RwLock<HashMap<SocketAddr, ExchangerSender>>>);
95
96impl Default for Exchanger {
97 fn default() -> Self {
98 Self(Arc::new(RwLock::new(HashMap::with_capacity(1024))))
99 }
100}
101
102impl Exchanger {
103 fn get_receiver(&self, interface: SocketAddr) -> UnboundedReceiver<(Bytes, PayloadType)> {
108 let (sender, receiver) = unbounded_channel();
109 self.0.write().insert(interface, sender);
110
111 receiver
112 }
113
114 fn send(&self, interface: &SocketAddr, ty: PayloadType, data: Bytes) {
121 let mut is_destroy = false;
122
123 {
124 if let Some(sender) = self.0.read().get(interface)
125 && sender.send((data, ty)).is_err()
126 {
127 is_destroy = true;
128 }
129 }
130
131 if is_destroy {
132 self.remove(interface);
133 }
134 }
135
136 pub fn remove(&self, interface: &SocketAddr) {
138 drop(self.0.write().remove(interface))
139 }
140}