turn_server/server/
mod.rs1pub mod transport;
2
3use std::{net::SocketAddr, sync::Arc};
4
5use ahash::{HashMap, HashMapExt};
6use anyhow::Result;
7use bytes::Bytes;
8use parking_lot::RwLock;
9
10use tokio::{
11 sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
12 task::JoinSet,
13};
14
15use crate::{
16 Service,
17 codec::message::methods::Method,
18 config::{Config, Interface},
19 server::transport::{Server, ServerOptions, Transport, tcp::TcpServer, udp::UdpServer},
20 statistics::Statistics,
21};
22
23pub async fn start_server(config: Config, service: Service, statistics: Statistics) -> Result<()> {
24 let exchanger = Exchanger::default();
25
26 let mut servers = JoinSet::new();
27
28 for interface in config.server.interfaces {
29 match interface {
30 Interface::Udp {
31 listen,
32 external,
33 idle_timeout,
34 mtu,
35 } => {
36 servers.spawn(UdpServer::start(
37 ServerOptions {
38 transport: Transport::Udp,
39 idle_timeout,
40 ssl: None,
41 external,
42 listen,
43 mtu,
44 },
45 service.clone(),
46 statistics.clone(),
47 exchanger.clone(),
48 ));
49 }
50 Interface::Tcp {
51 listen,
52 external,
53 idle_timeout,
54 ssl,
55 } => {
56 servers.spawn(TcpServer::start(
57 ServerOptions {
58 transport: Transport::Tcp,
59 idle_timeout,
60 external,
61 listen,
62 mtu: 0,
63 ssl,
64 },
65 service.clone(),
66 statistics.clone(),
67 exchanger.clone(),
68 ));
69 }
70 };
71 }
72
73 if let Some(res) = servers.join_next().await {
76 servers.abort_all();
77
78 return res?;
79 }
80
81 Ok(())
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85enum PayloadType {
86 Message(Method),
87 ChannelData,
88}
89
90#[derive(Clone)]
92pub struct Exchanger(Arc<RwLock<HashMap<SocketAddr, UnboundedSender<(Bytes, PayloadType)>>>>);
93
94impl Default for Exchanger {
95 fn default() -> Self {
96 Self(Arc::new(RwLock::new(HashMap::with_capacity(1024))))
97 }
98}
99
100impl Exchanger {
101 fn get_receiver(&self, interface: SocketAddr) -> UnboundedReceiver<(Bytes, PayloadType)> {
106 let (sender, receiver) = unbounded_channel();
107 self.0.write().insert(interface, sender);
108
109 receiver
110 }
111
112 fn send(&self, interface: &SocketAddr, ty: PayloadType, data: Bytes) {
119 let mut is_destroy = false;
120
121 {
122 if let Some(sender) = self.0.read().get(interface)
123 && sender.send((data, ty)).is_err()
124 {
125 is_destroy = true;
126 }
127 }
128
129 if is_destroy {
130 self.remove(interface);
131 }
132 }
133
134 pub fn remove(&self, interface: &SocketAddr) {
136 drop(self.0.write().remove(interface))
137 }
138}