turn_server/
observer.rs

1use std::{future::Future, sync::Arc};
2
3use crate::{config::Config, statistics::Statistics};
4
5#[cfg(feature = "hooks")]
6use crate::publicly::hooks::HooksService;
7
8#[cfg(feature = "hooks")]
9use serde_json::json;
10
11use anyhow::Result;
12use base64::{prelude::BASE64_STANDARD, Engine};
13use turn::SessionAddr;
14
15#[derive(Clone)]
16pub struct Observer {
17    config: Arc<Config>,
18    #[cfg(feature = "hooks")]
19    hooks: Arc<HooksService>,
20    #[cfg(feature = "api")]
21    statistics: Statistics,
22}
23
24impl Observer {
25    #[allow(unused_variables)]
26    pub async fn new(config: Arc<Config>, statistics: Statistics) -> Result<Self> {
27        Ok(Self {
28            #[cfg(feature = "hooks")]
29            hooks: Arc::new(HooksService::new(config.clone())?),
30            #[cfg(feature = "api")]
31            statistics,
32            config,
33        })
34    }
35}
36
37impl turn::Observer for Observer {
38    fn get_password(&self, addr: &SessionAddr, username: &str) -> impl Future<Output = Option<String>> + Send {
39        async move {
40            log::info!(
41                "auth: address={:?}, interface={:?}, username={:?}",
42                addr.address,
43                addr.interface,
44                username,
45            );
46
47            // Match the static authentication information first.
48            if let Some(it) = self.config.auth.static_credentials.get(username) {
49                return Some(it.clone());
50            }
51
52            // Try again to match the static authentication key.
53            if let Some(it) = &self.config.auth.static_auth_secret {
54                // Because (TURN REST api) this RFC does not mandate the format of the username,
55                // only suggested values. In principle, the RFC also indicates that the
56                // timestamp part of username can be set at will, so the timestamp is not
57                // verified here, and the external web service guarantees its security by
58                // itself.
59                return encode_password(it, username);
60            }
61
62            #[cfg(feature = "hooks")]
63            {
64                if let Some(it) = self.hooks.get_password(addr, username).await {
65                    return Some(it);
66                }
67            }
68
69            None
70        }
71    }
72
73    /// allocate request
74    ///
75    /// [rfc8489](https://tools.ietf.org/html/rfc8489)
76    ///
77    /// In all cases, the server SHOULD only allocate ports from the range
78    /// 49152 - 65535 (the Dynamic and/or Private Port range [PORT-NUMBERS]),
79    /// unless the TURN server application knows, through some means not
80    /// specified here, that other applications running on the same host as
81    /// the TURN server application will not be impacted by allocating ports
82    /// outside this range.  This condition can often be satisfied by running
83    /// the TURN server application on a dedicated machine and/or by
84    /// arranging that any other applications on the machine allocate ports
85    /// before the TURN server application starts.  In any case, the TURN
86    /// server SHOULD NOT allocate ports in the range 0 - 1023 (the Well-
87    /// Known Port range) to discourage clients from using TURN to run
88    /// standard services.
89    #[allow(clippy::let_underscore_future)]
90    fn allocated(&self, addr: &SessionAddr, name: &str, port: u16) {
91        log::info!(
92            "allocate: address={:?}, interface={:?}, username={:?}, port={}",
93            addr.address,
94            addr.interface,
95            name,
96            port
97        );
98
99        #[cfg(feature = "api")]
100        {
101            self.statistics.register(*addr);
102        }
103
104        #[cfg(feature = "hooks")]
105        {
106            self.hooks.emit(json!({
107                "kind": "allocated",
108                "session": {
109                    "address": addr.address,
110                    "interface": addr.interface,
111                },
112                "username": name,
113                "port": port,
114            }));
115        }
116    }
117
118    /// channel binding request
119    ///
120    /// The server MAY impose restrictions on the IP address and port values
121    /// allowed in the XOR-PEER-ADDRESS attribute; if a value is not allowed,
122    /// the server rejects the request with a 403 (Forbidden) error.
123    ///
124    /// If the request is valid, but the server is unable to fulfill the
125    /// request due to some capacity limit or similar, the server replies
126    /// with a 508 (Insufficient Capacity) error.
127    ///
128    /// Otherwise, the server replies with a ChannelBind success response.
129    /// There are no required attributes in a successful ChannelBind
130    /// response.
131    ///
132    /// If the server can satisfy the request, then the server creates or
133    /// refreshes the channel binding using the channel number in the
134    /// CHANNEL-NUMBER attribute and the interface address in the XOR-PEER-
135    /// ADDRESS attribute.  The server also installs or refreshes a
136    /// permission for the IP address in the XOR-PEER-ADDRESS attribute as
137    /// described in Section 9.
138    ///
139    /// NOTE: A server need not do anything special to implement
140    /// idempotency of ChannelBind requests over UDP using the
141    /// "stateless stack approach".  Retransmitted ChannelBind requests
142    /// will simply refresh the channel binding and the corresponding
143    /// permission.  Furthermore, the client must wait 5 minutes before
144    /// binding a previously bound channel number or peer address to a
145    /// different channel, eliminating the possibility that the
146    /// transaction would initially fail but succeed on a
147    /// retransmission.
148    #[allow(clippy::let_underscore_future)]
149    fn channel_bind(&self, addr: &SessionAddr, name: &str, channel: u16) {
150        log::info!(
151            "channel bind: address={:?}, interface={:?}, username={:?}, channel={}",
152            addr.address,
153            addr.interface,
154            name,
155            channel
156        );
157
158        #[cfg(feature = "hooks")]
159        {
160            self.hooks.emit(json!({
161                "kind": "channel_bind",
162                "session": {
163                    "address": addr.address,
164                    "interface": addr.interface,
165                },
166                "username": name,
167                "channel": channel,
168            }));
169        }
170    }
171
172    /// create permission request
173    ///
174    /// [rfc8489](https://tools.ietf.org/html/rfc8489)
175    ///
176    /// When the server receives the CreatePermission request, it processes
177    /// as per [Section 5](https://tools.ietf.org/html/rfc8656#section-5)
178    /// plus the specific rules mentioned here.
179    ///
180    /// The message is checked for validity.  The CreatePermission request
181    /// MUST contain at least one XOR-PEER-ADDRESS attribute and MAY contain
182    /// multiple such attributes.  If no such attribute exists, or if any of
183    /// these attributes are invalid, then a 400 (Bad Request) error is
184    /// returned.  If the request is valid, but the server is unable to
185    /// satisfy the request due to some capacity limit or similar, then a 508
186    /// (Insufficient Capacity) error is returned.
187    ///
188    /// If an XOR-PEER-ADDRESS attribute contains an address of an address
189    /// family that is not the same as that of a relayed interface address
190    /// for the allocation, the server MUST generate an error response with
191    /// the 443 (Peer Address Family Mismatch) response code.
192    ///
193    /// The server MAY impose restrictions on the IP address allowed in the
194    /// XOR-PEER-ADDRESS attribute; if a value is not allowed, the server
195    /// rejects the request with a 403 (Forbidden) error.
196    ///
197    /// If the message is valid and the server is capable of carrying out the
198    /// request, then the server installs or refreshes a permission for the
199    /// IP address contained in each XOR-PEER-ADDRESS attribute as described
200    /// in [Section 9](https://tools.ietf.org/html/rfc8656#section-9).  
201    /// The port portion of each attribute is ignored and may be any arbitrary
202    /// value.
203    ///
204    /// The server then responds with a CreatePermission success response.
205    /// There are no mandatory attributes in the success response.
206    ///
207    /// > NOTE: A server need not do anything special to implement
208    /// idempotency of CreatePermission requests over UDP using the
209    /// "stateless stack approach".  Retransmitted CreatePermission
210    /// requests will simply refresh the permissions.
211    #[allow(clippy::let_underscore_future)]
212    fn create_permission(&self, addr: &SessionAddr, name: &str, ports: &[u16]) {
213        log::info!(
214            "create permission: address={:?}, interface={:?}, username={:?}, ports={:?}",
215            addr.address,
216            addr.interface,
217            name,
218            ports
219        );
220
221        #[cfg(feature = "hooks")]
222        {
223            self.hooks.emit(json!({
224                "kind": "create_permission",
225                "session": {
226                    "address": addr.address,
227                    "interface": addr.interface,
228                },
229                "username": name,
230                "ports": ports,
231            }));
232        }
233    }
234
235    /// refresh request
236    ///
237    /// If the server receives a Refresh Request with a REQUESTED-ADDRESS-
238    /// FAMILY attribute and the attribute value does not match the address
239    /// family of the allocation, the server MUST reply with a 443 (Peer
240    /// Address Family Mismatch) Refresh error response.
241    ///
242    /// The server computes a value called the "desired lifetime" as follows:
243    /// if the request contains a LIFETIME attribute and the attribute value
244    /// is zero, then the "desired lifetime" is zero.  Otherwise, if the
245    /// request contains a LIFETIME attribute, then the server computes the
246    /// minimum of the client's requested lifetime and the server's maximum
247    /// allowed lifetime.  If this computed value is greater than the default
248    /// lifetime, then the "desired lifetime" is the computed value.
249    /// Otherwise, the "desired lifetime" is the default lifetime.
250    ///
251    /// Subsequent processing depends on the "desired lifetime" value:
252    ///
253    /// * If the "desired lifetime" is zero, then the request succeeds and
254    /// the allocation is deleted.
255    ///
256    /// * If the "desired lifetime" is non-zero, then the request succeeds
257    /// and the allocation's time-to-expiry is set to the "desired
258    /// lifetime".
259    ///
260    /// If the request succeeds, then the server sends a success response
261    /// containing:
262    ///
263    /// * A LIFETIME attribute containing the current value of the time-to-
264    /// expiry timer.
265    ///
266    /// NOTE: A server need not do anything special to implement
267    /// idempotency of Refresh requests over UDP using the "stateless
268    /// stack approach".  Retransmitted Refresh requests with a non-
269    /// zero "desired lifetime" will simply refresh the allocation.  A
270    /// retransmitted Refresh request with a zero "desired lifetime"
271    /// will cause a 437 (Allocation Mismatch) response if the
272    /// allocation has already been deleted, but the client will treat
273    /// this as equivalent to a success response (see below).
274    #[allow(clippy::let_underscore_future)]
275    fn refresh(&self, addr: &SessionAddr, name: &str, lifetime: u32) {
276        log::info!(
277            "refresh: address={:?}, interface={:?}, username={:?}, lifetime={}",
278            addr.address,
279            addr.interface,
280            name,
281            lifetime
282        );
283
284        #[cfg(feature = "hooks")]
285        {
286            self.hooks.emit(json!({
287                "kind": "refresh",
288                "session": {
289                    "address": addr.address,
290                    "interface": addr.interface,
291                },
292                "username": name,
293                "lifetime": lifetime,
294            }));
295        }
296    }
297
298    /// session closed
299    ///
300    /// Triggered when the session leaves from the turn. Possible reasons: the
301    /// session life cycle has expired, external active deletion, or active
302    /// exit of the session.
303    #[allow(clippy::let_underscore_future)]
304    fn closed(&self, addr: &SessionAddr, name: &str) {
305        log::info!(
306            "closed: address={:?}, interface={:?}, username={:?}",
307            addr.address,
308            addr.interface,
309            name
310        );
311
312        #[cfg(feature = "api")]
313        {
314            self.statistics.unregister(&addr);
315        }
316
317        #[cfg(feature = "hooks")]
318        {
319            self.hooks.emit(json!({
320                "kind": "closed",
321                "session": {
322                    "address": addr.address,
323                    "interface": addr.interface,
324                },
325                "username": name,
326            }));
327        }
328    }
329}
330
331// https://datatracker.ietf.org/doc/html/draft-uberti-behave-turn-rest-00#section-2.2
332fn encode_password(key: &str, username: &str) -> Option<String> {
333    Some(
334        BASE64_STANDARD.encode(
335            stun::util::hmac_sha1(key.as_bytes(), &[username.as_bytes()])
336                .ok()?
337                .into_bytes()
338                .as_slice(),
339        ),
340    )
341}