1#[cfg(test)]
2mod server_test;
3
4pub mod config;
5pub mod request;
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use config::*;
11use request::*;
12use tokio::sync::broadcast::error::RecvError;
13use tokio::sync::broadcast::{self};
14use tokio::sync::{mpsc, oneshot, Mutex};
15use tokio::time::{Duration, Instant};
16use util::Conn;
17
18use crate::allocation::allocation_manager::*;
19use crate::allocation::five_tuple::FiveTuple;
20use crate::allocation::AllocationInfo;
21use crate::auth::AuthHandler;
22use crate::error::*;
23use crate::proto::lifetime::DEFAULT_LIFETIME;
24
25const INBOUND_MTU: usize = 1500;
26
27pub struct Server {
29 auth_handler: Arc<dyn AuthHandler + Send + Sync>,
30 realm: String,
31 channel_bind_timeout: Duration,
32 pub(crate) nonces: Arc<Mutex<HashMap<String, Instant>>>,
33 command_tx: Mutex<Option<broadcast::Sender<Command>>>,
34}
35
36impl Server {
37 pub async fn new(config: ServerConfig) -> Result<Self> {
39 config.validate()?;
40
41 let (command_tx, _) = broadcast::channel(16);
42 let mut s = Server {
43 auth_handler: config.auth_handler,
44 realm: config.realm,
45 channel_bind_timeout: config.channel_bind_timeout,
46 nonces: Arc::new(Mutex::new(HashMap::new())),
47 command_tx: Mutex::new(Some(command_tx.clone())),
48 };
49
50 if s.channel_bind_timeout == Duration::from_secs(0) {
51 s.channel_bind_timeout = DEFAULT_LIFETIME;
52 }
53
54 for p in config.conn_configs.into_iter() {
55 let nonces = Arc::clone(&s.nonces);
56 let auth_handler = Arc::clone(&s.auth_handler);
57 let realm = s.realm.clone();
58 let channel_bind_timeout = s.channel_bind_timeout;
59 let handle_rx = command_tx.subscribe();
60 let conn = p.conn;
61 let allocation_manager = Arc::new(Manager::new(ManagerConfig {
62 relay_addr_generator: p.relay_addr_generator,
63 alloc_close_notify: config.alloc_close_notify.clone(),
64 }));
65
66 tokio::spawn(Server::read_loop(
67 conn,
68 allocation_manager,
69 nonces,
70 auth_handler,
71 realm,
72 channel_bind_timeout,
73 handle_rx,
74 ));
75 }
76
77 Ok(s)
78 }
79
80 pub async fn delete_allocations_by_username(&self, username: String) -> Result<()> {
84 let tx = {
85 let command_tx = self.command_tx.lock().await;
86 command_tx.clone()
87 };
88 if let Some(tx) = tx {
89 let (closed_tx, closed_rx) = mpsc::channel(1);
90 tx.send(Command::DeleteAllocations(username, Arc::new(closed_rx)))
91 .map_err(|_| Error::ErrClosed)?;
92
93 closed_tx.closed().await;
94
95 Ok(())
96 } else {
97 Err(Error::ErrClosed)
98 }
99 }
100
101 pub async fn get_allocations_info(
113 &self,
114 five_tuples: Option<Vec<FiveTuple>>,
115 ) -> Result<HashMap<FiveTuple, AllocationInfo>> {
116 if let Some(five_tuples) = &five_tuples {
117 if five_tuples.is_empty() {
118 return Ok(HashMap::new());
119 }
120 }
121
122 let tx = {
123 let command_tx = self.command_tx.lock().await;
124 command_tx.clone()
125 };
126 if let Some(tx) = tx {
127 let (infos_tx, mut infos_rx) = mpsc::channel(1);
128 tx.send(Command::GetAllocationsInfo(five_tuples, infos_tx))
129 .map_err(|_| Error::ErrClosed)?;
130
131 let mut info: HashMap<FiveTuple, AllocationInfo> = HashMap::new();
132
133 for _ in 0..tx.receiver_count() {
134 info.extend(infos_rx.recv().await.ok_or(Error::ErrClosed)?);
135 }
136
137 Ok(info)
138 } else {
139 Err(Error::ErrClosed)
140 }
141 }
142
143 async fn read_loop(
144 conn: Arc<dyn Conn + Send + Sync>,
145 allocation_manager: Arc<Manager>,
146 nonces: Arc<Mutex<HashMap<String, Instant>>>,
147 auth_handler: Arc<dyn AuthHandler + Send + Sync>,
148 realm: String,
149 channel_bind_timeout: Duration,
150 mut handle_rx: broadcast::Receiver<Command>,
151 ) {
152 let mut buf = vec![0u8; INBOUND_MTU];
153
154 let (mut close_tx, mut close_rx) = oneshot::channel::<()>();
155
156 tokio::spawn({
157 let allocation_manager = Arc::clone(&allocation_manager);
158
159 async move {
160 loop {
161 match handle_rx.recv().await {
162 Ok(Command::DeleteAllocations(name, _)) => {
163 allocation_manager
164 .delete_allocations_by_username(name.as_str())
165 .await;
166 continue;
167 }
168 Ok(Command::GetAllocationsInfo(five_tuples, tx)) => {
169 let infos = allocation_manager.get_allocations_info(five_tuples).await;
170 let _ = tx.send(infos).await;
171
172 continue;
173 }
174 Err(RecvError::Closed) | Ok(Command::Close(_)) => {
175 close_rx.close();
176 break;
177 }
178 Err(RecvError::Lagged(n)) => {
179 log::warn!("Turn server has lagged by {} messages", n);
180 continue;
181 }
182 }
183 }
184 }
185 });
186
187 loop {
188 let (n, addr) = tokio::select! {
189 v = conn.recv_from(&mut buf) => {
190 match v {
191 Ok(v) => v,
192 Err(err) => {
193 log::debug!("exit read loop on error: {}", err);
194 break;
195 }
196 }
197 },
198 _ = close_tx.closed() => break
199 };
200
201 let mut r = Request {
202 conn: Arc::clone(&conn),
203 src_addr: addr,
204 buff: buf[..n].to_vec(),
205 allocation_manager: Arc::clone(&allocation_manager),
206 nonces: Arc::clone(&nonces),
207 auth_handler: Arc::clone(&auth_handler),
208 realm: realm.clone(),
209 channel_bind_timeout,
210 };
211
212 if let Err(err) = r.handle_request().await {
213 log::error!("error when handling datagram: {}", err);
214 }
215 }
216
217 let _ = allocation_manager.close().await;
218 let _ = conn.close().await;
219 }
220
221 pub async fn close(&self) -> Result<()> {
223 let tx = {
224 let mut command_tx = self.command_tx.lock().await;
225 command_tx.take()
226 };
227
228 if let Some(tx) = tx {
229 if tx.receiver_count() == 0 {
230 return Ok(());
231 }
232
233 let (closed_tx, closed_rx) = mpsc::channel(1);
234 let _ = tx.send(Command::Close(Arc::new(closed_rx)));
235 closed_tx.closed().await
236 }
237
238 Ok(())
239 }
240}
241
242#[derive(Clone)]
245enum Command {
246 DeleteAllocations(String, Arc<mpsc::Receiver<()>>),
250
251 GetAllocationsInfo(
255 Option<Vec<FiveTuple>>,
256 mpsc::Sender<HashMap<FiveTuple, AllocationInfo>>,
257 ),
258
259 Close(Arc<mpsc::Receiver<()>>),
261}