turn_driver/
lib.rs

1use std::{fmt::Display, future::Future, net::SocketAddr, sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use axum::{
5    extract::{Json as Body, Query, State},
6    http::HeaderMap,
7    response::IntoResponse,
8    routing::{get, post},
9    Router,
10};
11
12use reqwest::{Client, ClientBuilder, Response, StatusCode};
13use serde::{Deserialize, Serialize};
14use tokio::net::TcpListener;
15
16#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq, Eq)]
17#[serde(rename_all = "lowercase")]
18pub enum Transport {
19    TCP,
20    UDP,
21}
22
23#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq, Eq)]
24pub struct SessionAddr {
25    pub address: SocketAddr,
26    pub interface: SocketAddr,
27}
28
29#[derive(Deserialize, Serialize, Debug, Clone)]
30pub struct Interface {
31    pub transport: Transport,
32    /// turn server listen address
33    pub bind: SocketAddr,
34    /// specify the node external address and port
35    pub external: SocketAddr,
36}
37
38#[derive(Debug, Clone, Deserialize, Serialize)]
39pub struct Info {
40    /// Software information of turn server
41    pub software: String,
42    /// Turn the server's running time in seconds
43    pub uptime: u64,
44    /// The number of allocated ports
45    pub port_allocated: u16,
46    /// The total number of ports available for allocation
47    pub port_capacity: u16,
48    /// Turn all interfaces bound to the server
49    pub interfaces: Vec<Interface>,
50}
51
52#[derive(Debug, Clone, Deserialize, Serialize)]
53pub struct Session {
54    /// Username used in session authentication
55    pub username: String,
56    /// The password used in session authentication
57    pub password: String,
58    /// Channel numbers that have been assigned to the session
59    pub channels: Vec<u16>,
60    /// Port numbers that have been assigned to the session
61    pub port: Option<u16>,
62    /// The validity period of the current session application, in seconds
63    pub expires: u32,
64    pub permissions: Vec<u16>,
65}
66
67#[derive(Debug, Clone, Deserialize)]
68pub struct Statistics {
69    /// Number of bytes received in the current session
70    pub received_bytes: u64,
71    /// The number of bytes sent by the current session
72    pub send_bytes: u64,
73    /// Number of packets received in the current session
74    pub received_pkts: u64,
75    /// The number of packets sent by the current session
76    pub send_pkts: u64,
77    /// The number of packets error by the current session
78    pub error_pkts: u64,
79}
80
81impl<'a> Display for SessionAddr {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        write!(
84            f,
85            "{}",
86            format!("address={}&interface={}", self.address, self.interface)
87        )
88    }
89}
90
91/// Controlling message packaging
92#[derive(Debug)]
93pub struct Message<T> {
94    /// turn server realm
95    pub realm: String,
96    /// The runtime id of the turn server. A new ID is generated each time the
97    /// server is started. This is a random string. Its main function is to
98    /// determine whether the turn server has been restarted.
99    pub nonce: String,
100    pub payload: T,
101}
102
103impl<T> Message<T> {
104    async fn from_res<F: Future<Output = Option<T>>>(
105        res: Response,
106        handler: impl FnOnce(Response) -> F,
107    ) -> Option<Self> {
108        let (realm, nonce) = get_realm_and_nonce(res.headers())?;
109        Some(Self {
110            realm: realm.to_string(),
111            nonce: nonce.to_string(),
112            payload: handler(res).await?,
113        })
114    }
115}
116
117/// The controller of the turn server is used to control the server and obtain
118/// server information through the HTTP interface
119pub struct Controller {
120    client: Client,
121    server: String,
122}
123
124impl Controller {
125    /// Create a controller by specifying the listening address of the turn
126    /// server api interface, such as `http://localhost:3000`
127    pub fn new(server: &str) -> Result<Self, reqwest::Error> {
128        Ok(Self {
129            server: server.to_string(),
130            client: ClientBuilder::new()
131                .timeout(Duration::from_secs(5))
132                .build()?,
133        })
134    }
135
136    /// Get the information of the turn server, including version information,
137    /// listening interface, startup time, etc.
138    pub async fn get_info(&self) -> Option<Message<Info>> {
139        Message::from_res(
140            self.client
141                .get(format!("{}/info", self.server))
142                .send()
143                .await
144                .ok()?,
145            |res| async { res.json().await.ok() },
146        )
147        .await
148    }
149
150    /// Get session information. A session corresponds to each UDP socket. It
151    /// should be noted that a user can have multiple sessions at the same time.
152    pub async fn get_session(&self, query: &SessionAddr) -> Option<Message<Session>> {
153        Message::from_res(
154            self.client
155                .get(format!("{}/session?{}", self.server, query))
156                .send()
157                .await
158                .ok()?,
159            |res| async { res.json().await.ok() },
160        )
161        .await
162    }
163
164    /// Get session statistics, which is mainly the traffic statistics of the
165    /// current session
166    pub async fn get_session_statistics(&self, query: &SessionAddr) -> Option<Message<Statistics>> {
167        Message::from_res(
168            self.client
169                .get(format!("{}/session/statistics?{}", self.server, query))
170                .send()
171                .await
172                .ok()?,
173            |res| async { res.json().await.ok() },
174        )
175        .await
176    }
177
178    /// Delete the session. Deleting the session will cause the turn server to
179    /// delete all routing information of the current session. If there is a
180    /// peer, the peer will also be disconnected.
181    pub async fn remove_session(&self, query: &SessionAddr) -> Option<Message<bool>> {
182        Message::from_res(
183            self.client
184                .delete(format!("{}/session?{}", self.server, query))
185                .send()
186                .await
187                .ok()?,
188            |res| async move { Some(res.status() == StatusCode::OK) },
189        )
190        .await
191    }
192}
193
194#[derive(Debug, Deserialize)]
195#[serde(tag = "kind", rename_all = "snake_case")]
196pub enum Events {
197    /// allocate request
198    ///
199    /// [rfc8489](https://tools.ietf.org/html/rfc8489)
200    ///
201    /// In all cases, the server SHOULD only allocate ports from the range
202    /// 49152 - 65535 (the Dynamic and/or Private Port range [PORT-NUMBERS]),
203    /// unless the TURN server application knows, through some means not
204    /// specified here, that other applications running on the same host as
205    /// the TURN server application will not be impacted by allocating ports
206    /// outside this range.  This condition can often be satisfied by running
207    /// the TURN server application on a dedicated machine and/or by
208    /// arranging that any other applications on the machine allocate ports
209    /// before the TURN server application starts.  In any case, the TURN
210    /// server SHOULD NOT allocate ports in the range 0 - 1023 (the Well-
211    /// Known Port range) to discourage clients from using TURN to run
212    /// standard services.
213    Allocated {
214        session: SessionAddr,
215        username: String,
216        port: u16,
217    },
218    /// channel binding request
219    ///
220    /// The server MAY impose restrictions on the IP address and port values
221    /// allowed in the XOR-PEER-ADDRESS attribute; if a value is not allowed,
222    /// the server rejects the request with a 403 (Forbidden) error.
223    ///
224    /// If the request is valid, but the server is unable to fulfill the
225    /// request due to some capacity limit or similar, the server replies
226    /// with a 508 (Insufficient Capacity) error.
227    ///
228    /// Otherwise, the server replies with a ChannelBind success response.
229    /// There are no required attributes in a successful ChannelBind
230    /// response.
231    ///
232    /// If the server can satisfy the request, then the server creates or
233    /// refreshes the channel binding using the channel number in the
234    /// CHANNEL-NUMBER attribute and the transport address in the XOR-PEER-
235    /// ADDRESS attribute.  The server also installs or refreshes a
236    /// permission for the IP address in the XOR-PEER-ADDRESS attribute as
237    /// described in Section 9.
238    ///
239    /// NOTE: A server need not do anything special to implement
240    /// idempotency of ChannelBind requests over UDP using the
241    /// "stateless stack approach".  Retransmitted ChannelBind requests
242    /// will simply refresh the channel binding and the corresponding
243    /// permission.  Furthermore, the client must wait 5 minutes before
244    /// binding a previously bound channel number or peer address to a
245    /// different channel, eliminating the possibility that the
246    /// transaction would initially fail but succeed on a
247    /// retransmission.
248    ChannelBind {
249        session: SessionAddr,
250        username: String,
251        channel: u16,
252    },
253    /// create permission request
254    ///
255    /// [rfc8489](https://tools.ietf.org/html/rfc8489)
256    ///
257    /// When the server receives the CreatePermission request, it processes
258    /// as per [Section 5](https://tools.ietf.org/html/rfc8656#section-5)
259    /// plus the specific rules mentioned here.
260    ///
261    /// The message is checked for validity.  The CreatePermission request
262    /// MUST contain at least one XOR-PEER-ADDRESS attribute and MAY contain
263    /// multiple such attributes.  If no such attribute exists, or if any of
264    /// these attributes are invalid, then a 400 (Bad Request) error is
265    /// returned.  If the request is valid, but the server is unable to
266    /// satisfy the request due to some capacity limit or similar, then a 508
267    /// (Insufficient Capacity) error is returned.
268    ///
269    /// If an XOR-PEER-ADDRESS attribute contains an address of an address
270    /// family that is not the same as that of a relayed transport address
271    /// for the allocation, the server MUST generate an error response with
272    /// the 443 (Peer Address Family Mismatch) response code.
273    ///
274    /// The server MAY impose restrictions on the IP address allowed in the
275    /// XOR-PEER-ADDRESS attribute; if a value is not allowed, the server
276    /// rejects the request with a 403 (Forbidden) error.
277    ///
278    /// If the message is valid and the server is capable of carrying out the
279    /// request, then the server installs or refreshes a permission for the
280    /// IP address contained in each XOR-PEER-ADDRESS attribute as described
281    /// in [Section 9](https://tools.ietf.org/html/rfc8656#section-9).  
282    /// The port portion of each attribute is ignored and may be any arbitrary
283    /// value.
284    ///
285    /// The server then responds with a CreatePermission success response.
286    /// There are no mandatory attributes in the success response.
287    ///
288    /// NOTE: A server need not do anything special to implement idempotency of
289    /// CreatePermission requests over UDP using the "stateless stack approach".
290    /// Retransmitted CreatePermission requests will simply refresh the
291    /// permissions.
292    CreatePermission {
293        session: SessionAddr,
294        username: String,
295        ports: Vec<u16>,
296    },
297    /// refresh request
298    ///
299    /// If the server receives a Refresh Request with a REQUESTED-ADDRESS-
300    /// FAMILY attribute and the attribute value does not match the address
301    /// family of the allocation, the server MUST reply with a 443 (Peer
302    /// Address Family Mismatch) Refresh error response.
303    ///
304    /// The server computes a value called the "desired lifetime" as follows:
305    /// if the request contains a LIFETIME attribute and the attribute value
306    /// is zero, then the "desired lifetime" is zero.  Otherwise, if the
307    /// request contains a LIFETIME attribute, then the server computes the
308    /// minimum of the client's requested lifetime and the server's maximum
309    /// allowed lifetime.  If this computed value is greater than the default
310    /// lifetime, then the "desired lifetime" is the computed value.
311    /// Otherwise, the "desired lifetime" is the default lifetime.
312    ///
313    /// Subsequent processing depends on the "desired lifetime" value:
314    ///
315    /// * If the "desired lifetime" is zero, then the request succeeds and the
316    ///   allocation is deleted.
317    ///
318    /// * If the "desired lifetime" is non-zero, then the request succeeds and
319    ///   the allocation's time-to-expiry is set to the "desired lifetime".
320    ///
321    /// If the request succeeds, then the server sends a success response
322    /// containing:
323    ///
324    /// * A LIFETIME attribute containing the current value of the
325    ///   time-to-expiry timer.
326    ///
327    /// NOTE: A server need not do anything special to implement
328    /// idempotency of Refresh requests over UDP using the "stateless
329    /// stack approach".  Retransmitted Refresh requests with a non-
330    /// zero "desired lifetime" will simply refresh the allocation.  A
331    /// retransmitted Refresh request with a zero "desired lifetime"
332    /// will cause a 437 (Allocation Mismatch) response if the
333    /// allocation has already been deleted, but the client will treat
334    /// this as equivalent to a success response (see below).
335    Refresh {
336        session: SessionAddr,
337        username: String,
338        lifetime: u32,
339    },
340    /// session closed
341    ///
342    /// Triggered when the session leaves from the turn. Possible reasons: the
343    /// session life cycle has expired, external active deletion, or active
344    /// exit of the session.
345    Closed {
346        session: SessionAddr,
347        username: String,
348    },
349}
350
351/// Abstraction that handles turn server communication with the outside world
352///
353/// ```ignore
354/// struct HooksImpl;
355///
356/// #[async_trait]
357/// impl Hooks for HooksImpl {
358///     async fn auth(&self, addr: SocketAddr, name: String, realm: String, rid: String) -> Option<&str> {
359///         get_password(username).await // Pretend this function exists
360///     }
361///
362///     async fn on(&self, event: Events, realm: String, rid: String) {
363///         println!("event={:?}, realm={}, rid={}", event, realm, rid)
364///     }
365/// }
366/// ```
367#[async_trait]
368pub trait Hooks {
369    /// When the turn server needs to authenticate the current user, hooks only
370    /// needs to find the key according to the username and other information of
371    /// the current session and return it
372    #[allow(unused_variables)]
373    async fn auth(
374        &self,
375        session: &SessionAddr,
376        username: &str,
377        realm: &str,
378        nonce: &str,
379    ) -> Option<&str> {
380        None
381    }
382
383    /// Called when the turn server pushes an event
384    #[allow(unused_variables)]
385    async fn on(&self, event: &Events, realm: &str, nonce: &str) {}
386}
387
388#[derive(Deserialize)]
389struct GetPasswordQuery {
390    address: SocketAddr,
391    interface: SocketAddr,
392    username: String,
393}
394
395/// Create a hooks service, which will create an HTTP server. The turn server
396/// can request this server and push events to this server.
397pub async fn start_hooks_server<T>(bind: SocketAddr, hooks: T) -> Result<(), std::io::Error>
398where
399    T: Hooks + Send + Sync + 'static,
400{
401    let app = Router::new()
402        .route(
403            "/password",
404            get(
405                |headers: HeaderMap,
406                 State(state): State<Arc<T>>,
407                 Query(query): Query<GetPasswordQuery>| async move {
408                    if let Some((realm, nonce)) = get_realm_and_nonce(&headers) {
409                        if let Some(password) =
410                            state.auth(&SessionAddr {
411                                address: query.address,
412                                interface: query.interface,
413                            }, &query.username, realm, nonce).await
414                        {
415                            return password.to_string().into_response();
416                        }
417                    }
418
419                    StatusCode::NOT_FOUND.into_response()
420                },
421            ),
422        )
423        .route(
424            "/events",
425            post(
426                |headers: HeaderMap, State(state): State<Arc<T>>, Body(event): Body<Events>| async move {
427                    if let Some((realm, nonce)) = get_realm_and_nonce(&headers) {
428                        state.on(&event, realm, nonce).await;
429                    }
430
431                    StatusCode::OK
432                },
433            ),
434        )
435        .with_state(Arc::new(hooks));
436
437    axum::serve(TcpListener::bind(bind).await?, app).await?;
438
439    Ok(())
440}
441
442fn get_realm_and_nonce(headers: &HeaderMap) -> Option<(&str, &str)> {
443    if let (Some(Ok(realm)), Some(Ok(nonce))) = (
444        headers.get("realm").map(|it| it.to_str()),
445        headers.get("nonce").map(|it| it.to_str()),
446    ) {
447        Some((realm, nonce))
448    } else {
449        None
450    }
451}