turn_driver/lib.rs
1use std::{fmt::Display, future::Future, net::SocketAddr, sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use axum::{
5 extract::{Json as Body, Query, State},
6 http::HeaderMap,
7 response::IntoResponse,
8 routing::{get, post},
9 Router,
10};
11
12use reqwest::{Client, ClientBuilder, Response, StatusCode};
13use serde::{Deserialize, Serialize};
14use tokio::net::TcpListener;
15
16#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq, Eq)]
17#[serde(rename_all = "lowercase")]
18pub enum Transport {
19 TCP,
20 UDP,
21}
22
23#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq, Eq)]
24pub struct SessionAddr {
25 pub address: SocketAddr,
26 pub interface: SocketAddr,
27}
28
29#[derive(Deserialize, Serialize, Debug, Clone)]
30pub struct Interface {
31 pub transport: Transport,
32 /// turn server listen address
33 pub bind: SocketAddr,
34 /// specify the node external address and port
35 pub external: SocketAddr,
36}
37
38#[derive(Debug, Clone, Deserialize, Serialize)]
39pub struct Info {
40 /// Software information of turn server
41 pub software: String,
42 /// Turn the server's running time in seconds
43 pub uptime: u64,
44 /// The number of allocated ports
45 pub port_allocated: u16,
46 /// The total number of ports available for allocation
47 pub port_capacity: u16,
48 /// Turn all interfaces bound to the server
49 pub interfaces: Vec<Interface>,
50}
51
52#[derive(Debug, Clone, Deserialize, Serialize)]
53pub struct Session {
54 /// Username used in session authentication
55 pub username: String,
56 /// The password used in session authentication
57 pub password: String,
58 /// Channel numbers that have been assigned to the session
59 pub channels: Vec<u16>,
60 /// Port numbers that have been assigned to the session
61 pub port: Option<u16>,
62 /// The validity period of the current session application, in seconds
63 pub expires: u32,
64 pub permissions: Vec<u16>,
65}
66
67#[derive(Debug, Clone, Deserialize)]
68pub struct Statistics {
69 /// Number of bytes received in the current session
70 pub received_bytes: u64,
71 /// The number of bytes sent by the current session
72 pub send_bytes: u64,
73 /// Number of packets received in the current session
74 pub received_pkts: u64,
75 /// The number of packets sent by the current session
76 pub send_pkts: u64,
77 /// The number of packets error by the current session
78 pub error_pkts: u64,
79}
80
81impl<'a> Display for SessionAddr {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 write!(
84 f,
85 "{}",
86 format!("address={}&interface={}", self.address, self.interface)
87 )
88 }
89}
90
91/// Controlling message packaging
92#[derive(Debug)]
93pub struct Message<T> {
94 /// turn server realm
95 pub realm: String,
96 /// The runtime id of the turn server. A new ID is generated each time the
97 /// server is started. This is a random string. Its main function is to
98 /// determine whether the turn server has been restarted.
99 pub nonce: String,
100 pub payload: T,
101}
102
103impl<T> Message<T> {
104 async fn from_res<F: Future<Output = Option<T>>>(
105 res: Response,
106 handler: impl FnOnce(Response) -> F,
107 ) -> Option<Self> {
108 let (realm, nonce) = get_realm_and_nonce(res.headers())?;
109 Some(Self {
110 realm: realm.to_string(),
111 nonce: nonce.to_string(),
112 payload: handler(res).await?,
113 })
114 }
115}
116
117/// The controller of the turn server is used to control the server and obtain
118/// server information through the HTTP interface
119pub struct Controller {
120 client: Client,
121 server: String,
122}
123
124impl Controller {
125 /// Create a controller by specifying the listening address of the turn
126 /// server api interface, such as `http://localhost:3000`
127 pub fn new(server: &str) -> Result<Self, reqwest::Error> {
128 Ok(Self {
129 server: server.to_string(),
130 client: ClientBuilder::new()
131 .timeout(Duration::from_secs(5))
132 .build()?,
133 })
134 }
135
136 /// Get the information of the turn server, including version information,
137 /// listening interface, startup time, etc.
138 pub async fn get_info(&self) -> Option<Message<Info>> {
139 Message::from_res(
140 self.client
141 .get(format!("{}/info", self.server))
142 .send()
143 .await
144 .ok()?,
145 |res| async { res.json().await.ok() },
146 )
147 .await
148 }
149
150 /// Get session information. A session corresponds to each UDP socket. It
151 /// should be noted that a user can have multiple sessions at the same time.
152 pub async fn get_session(&self, query: &SessionAddr) -> Option<Message<Session>> {
153 Message::from_res(
154 self.client
155 .get(format!("{}/session?{}", self.server, query))
156 .send()
157 .await
158 .ok()?,
159 |res| async { res.json().await.ok() },
160 )
161 .await
162 }
163
164 /// Get session statistics, which is mainly the traffic statistics of the
165 /// current session
166 pub async fn get_session_statistics(&self, query: &SessionAddr) -> Option<Message<Statistics>> {
167 Message::from_res(
168 self.client
169 .get(format!("{}/session/statistics?{}", self.server, query))
170 .send()
171 .await
172 .ok()?,
173 |res| async { res.json().await.ok() },
174 )
175 .await
176 }
177
178 /// Delete the session. Deleting the session will cause the turn server to
179 /// delete all routing information of the current session. If there is a
180 /// peer, the peer will also be disconnected.
181 pub async fn remove_session(&self, query: &SessionAddr) -> Option<Message<bool>> {
182 Message::from_res(
183 self.client
184 .delete(format!("{}/session?{}", self.server, query))
185 .send()
186 .await
187 .ok()?,
188 |res| async move { Some(res.status() == StatusCode::OK) },
189 )
190 .await
191 }
192}
193
194#[derive(Debug, Deserialize)]
195#[serde(tag = "kind", rename_all = "snake_case")]
196pub enum Events {
197 /// allocate request
198 ///
199 /// [rfc8489](https://tools.ietf.org/html/rfc8489)
200 ///
201 /// In all cases, the server SHOULD only allocate ports from the range
202 /// 49152 - 65535 (the Dynamic and/or Private Port range [PORT-NUMBERS]),
203 /// unless the TURN server application knows, through some means not
204 /// specified here, that other applications running on the same host as
205 /// the TURN server application will not be impacted by allocating ports
206 /// outside this range. This condition can often be satisfied by running
207 /// the TURN server application on a dedicated machine and/or by
208 /// arranging that any other applications on the machine allocate ports
209 /// before the TURN server application starts. In any case, the TURN
210 /// server SHOULD NOT allocate ports in the range 0 - 1023 (the Well-
211 /// Known Port range) to discourage clients from using TURN to run
212 /// standard services.
213 Allocated {
214 session: SessionAddr,
215 username: String,
216 port: u16,
217 },
218 /// channel binding request
219 ///
220 /// The server MAY impose restrictions on the IP address and port values
221 /// allowed in the XOR-PEER-ADDRESS attribute; if a value is not allowed,
222 /// the server rejects the request with a 403 (Forbidden) error.
223 ///
224 /// If the request is valid, but the server is unable to fulfill the
225 /// request due to some capacity limit or similar, the server replies
226 /// with a 508 (Insufficient Capacity) error.
227 ///
228 /// Otherwise, the server replies with a ChannelBind success response.
229 /// There are no required attributes in a successful ChannelBind
230 /// response.
231 ///
232 /// If the server can satisfy the request, then the server creates or
233 /// refreshes the channel binding using the channel number in the
234 /// CHANNEL-NUMBER attribute and the transport address in the XOR-PEER-
235 /// ADDRESS attribute. The server also installs or refreshes a
236 /// permission for the IP address in the XOR-PEER-ADDRESS attribute as
237 /// described in Section 9.
238 ///
239 /// NOTE: A server need not do anything special to implement
240 /// idempotency of ChannelBind requests over UDP using the
241 /// "stateless stack approach". Retransmitted ChannelBind requests
242 /// will simply refresh the channel binding and the corresponding
243 /// permission. Furthermore, the client must wait 5 minutes before
244 /// binding a previously bound channel number or peer address to a
245 /// different channel, eliminating the possibility that the
246 /// transaction would initially fail but succeed on a
247 /// retransmission.
248 ChannelBind {
249 session: SessionAddr,
250 username: String,
251 channel: u16,
252 },
253 /// create permission request
254 ///
255 /// [rfc8489](https://tools.ietf.org/html/rfc8489)
256 ///
257 /// When the server receives the CreatePermission request, it processes
258 /// as per [Section 5](https://tools.ietf.org/html/rfc8656#section-5)
259 /// plus the specific rules mentioned here.
260 ///
261 /// The message is checked for validity. The CreatePermission request
262 /// MUST contain at least one XOR-PEER-ADDRESS attribute and MAY contain
263 /// multiple such attributes. If no such attribute exists, or if any of
264 /// these attributes are invalid, then a 400 (Bad Request) error is
265 /// returned. If the request is valid, but the server is unable to
266 /// satisfy the request due to some capacity limit or similar, then a 508
267 /// (Insufficient Capacity) error is returned.
268 ///
269 /// If an XOR-PEER-ADDRESS attribute contains an address of an address
270 /// family that is not the same as that of a relayed transport address
271 /// for the allocation, the server MUST generate an error response with
272 /// the 443 (Peer Address Family Mismatch) response code.
273 ///
274 /// The server MAY impose restrictions on the IP address allowed in the
275 /// XOR-PEER-ADDRESS attribute; if a value is not allowed, the server
276 /// rejects the request with a 403 (Forbidden) error.
277 ///
278 /// If the message is valid and the server is capable of carrying out the
279 /// request, then the server installs or refreshes a permission for the
280 /// IP address contained in each XOR-PEER-ADDRESS attribute as described
281 /// in [Section 9](https://tools.ietf.org/html/rfc8656#section-9).
282 /// The port portion of each attribute is ignored and may be any arbitrary
283 /// value.
284 ///
285 /// The server then responds with a CreatePermission success response.
286 /// There are no mandatory attributes in the success response.
287 ///
288 /// NOTE: A server need not do anything special to implement idempotency of
289 /// CreatePermission requests over UDP using the "stateless stack approach".
290 /// Retransmitted CreatePermission requests will simply refresh the
291 /// permissions.
292 CreatePermission {
293 session: SessionAddr,
294 username: String,
295 ports: Vec<u16>,
296 },
297 /// refresh request
298 ///
299 /// If the server receives a Refresh Request with a REQUESTED-ADDRESS-
300 /// FAMILY attribute and the attribute value does not match the address
301 /// family of the allocation, the server MUST reply with a 443 (Peer
302 /// Address Family Mismatch) Refresh error response.
303 ///
304 /// The server computes a value called the "desired lifetime" as follows:
305 /// if the request contains a LIFETIME attribute and the attribute value
306 /// is zero, then the "desired lifetime" is zero. Otherwise, if the
307 /// request contains a LIFETIME attribute, then the server computes the
308 /// minimum of the client's requested lifetime and the server's maximum
309 /// allowed lifetime. If this computed value is greater than the default
310 /// lifetime, then the "desired lifetime" is the computed value.
311 /// Otherwise, the "desired lifetime" is the default lifetime.
312 ///
313 /// Subsequent processing depends on the "desired lifetime" value:
314 ///
315 /// * If the "desired lifetime" is zero, then the request succeeds and the
316 /// allocation is deleted.
317 ///
318 /// * If the "desired lifetime" is non-zero, then the request succeeds and
319 /// the allocation's time-to-expiry is set to the "desired lifetime".
320 ///
321 /// If the request succeeds, then the server sends a success response
322 /// containing:
323 ///
324 /// * A LIFETIME attribute containing the current value of the
325 /// time-to-expiry timer.
326 ///
327 /// NOTE: A server need not do anything special to implement
328 /// idempotency of Refresh requests over UDP using the "stateless
329 /// stack approach". Retransmitted Refresh requests with a non-
330 /// zero "desired lifetime" will simply refresh the allocation. A
331 /// retransmitted Refresh request with a zero "desired lifetime"
332 /// will cause a 437 (Allocation Mismatch) response if the
333 /// allocation has already been deleted, but the client will treat
334 /// this as equivalent to a success response (see below).
335 Refresh {
336 session: SessionAddr,
337 username: String,
338 lifetime: u32,
339 },
340 /// session closed
341 ///
342 /// Triggered when the session leaves from the turn. Possible reasons: the
343 /// session life cycle has expired, external active deletion, or active
344 /// exit of the session.
345 Closed {
346 session: SessionAddr,
347 username: String,
348 },
349}
350
351/// Abstraction that handles turn server communication with the outside world
352///
353/// ```ignore
354/// struct HooksImpl;
355///
356/// #[async_trait]
357/// impl Hooks for HooksImpl {
358/// async fn auth(&self, addr: SocketAddr, name: String, realm: String, rid: String) -> Option<&str> {
359/// get_password(username).await // Pretend this function exists
360/// }
361///
362/// async fn on(&self, event: Events, realm: String, rid: String) {
363/// println!("event={:?}, realm={}, rid={}", event, realm, rid)
364/// }
365/// }
366/// ```
367#[async_trait]
368pub trait Hooks {
369 /// When the turn server needs to authenticate the current user, hooks only
370 /// needs to find the key according to the username and other information of
371 /// the current session and return it
372 #[allow(unused_variables)]
373 async fn auth(
374 &self,
375 session: &SessionAddr,
376 username: &str,
377 realm: &str,
378 nonce: &str,
379 ) -> Option<&str> {
380 None
381 }
382
383 /// Called when the turn server pushes an event
384 #[allow(unused_variables)]
385 async fn on(&self, event: &Events, realm: &str, nonce: &str) {}
386}
387
388#[derive(Deserialize)]
389struct GetPasswordQuery {
390 address: SocketAddr,
391 interface: SocketAddr,
392 username: String,
393}
394
395/// Create a hooks service, which will create an HTTP server. The turn server
396/// can request this server and push events to this server.
397pub async fn start_hooks_server<T>(bind: SocketAddr, hooks: T) -> Result<(), std::io::Error>
398where
399 T: Hooks + Send + Sync + 'static,
400{
401 let app = Router::new()
402 .route(
403 "/password",
404 get(
405 |headers: HeaderMap,
406 State(state): State<Arc<T>>,
407 Query(query): Query<GetPasswordQuery>| async move {
408 if let Some((realm, nonce)) = get_realm_and_nonce(&headers) {
409 if let Some(password) =
410 state.auth(&SessionAddr {
411 address: query.address,
412 interface: query.interface,
413 }, &query.username, realm, nonce).await
414 {
415 return password.to_string().into_response();
416 }
417 }
418
419 StatusCode::NOT_FOUND.into_response()
420 },
421 ),
422 )
423 .route(
424 "/events",
425 post(
426 |headers: HeaderMap, State(state): State<Arc<T>>, Body(event): Body<Events>| async move {
427 if let Some((realm, nonce)) = get_realm_and_nonce(&headers) {
428 state.on(&event, realm, nonce).await;
429 }
430
431 StatusCode::OK
432 },
433 ),
434 )
435 .with_state(Arc::new(hooks));
436
437 axum::serve(TcpListener::bind(bind).await?, app).await?;
438
439 Ok(())
440}
441
442fn get_realm_and_nonce(headers: &HeaderMap) -> Option<(&str, &str)> {
443 if let (Some(Ok(realm)), Some(Ok(nonce))) = (
444 headers.get("realm").map(|it| it.to_str()),
445 headers.get("nonce").map(|it| it.to_str()),
446 ) {
447 Some((realm, nonce))
448 } else {
449 None
450 }
451}