medea_turn/server/
mod.rs

1//! [STUN]/[TURN] server implementation.
2//!
3//! [STUN]: https://en.wikipedia.org/wiki/STUN
4//! [TURN]: https://en.wikipedia.org/wiki/TURN
5
6mod request;
7
8use std::{collections::HashMap, sync::Arc};
9
10use derive_more::with_trait::Debug;
11use tokio::{
12    sync::{
13        broadcast::{
14            error::RecvError,
15            {self},
16        },
17        mpsc, oneshot,
18    },
19    task::JoinHandle,
20    time::Duration,
21};
22
23#[cfg(doc)]
24use crate::allocation::Allocation;
25use crate::{
26    AuthHandler, Error,
27    allocation::{FiveTuple, Info},
28    relay,
29    server::request::TurnCtx,
30    transport::{self, Transport},
31};
32
33/// Default lifetime of an [allocation][1] (10 minutes) as defined in
34/// [RFC 5766 Section 2.2][1].
35///
36/// [1]: https://tools.ietf.org/html/rfc5766#section-2.2
37pub(crate) const DEFAULT_LIFETIME: Duration = Duration::from_secs(10 * 60);
38
39/// [MTU] of UDP connections.
40///
41/// [MTU]: https://en.wikipedia.org/wiki/Maximum_transmission_unit
42pub(crate) const INBOUND_MTU: usize = 1500;
43
44/// Configuration of a [`Server`].
45#[derive(Debug)]
46pub struct Config<Auth> {
47    /// List of all [STUN]/[TURN] connections listeners.
48    ///
49    /// Each listener may have a custom behavior around the creation of
50    /// [`relay`]s.
51    ///
52    /// [STUN]: https://en.wikipedia.org/wiki/STUN
53    /// [TURN]: https://en.wikipedia.org/wiki/TURN
54    #[debug("{:?}", connections.iter()
55        .map(|c| (c.local_addr(), c.proto()))
56        .collect::<Vec<_>>())]
57    pub connections: Vec<Arc<dyn Transport + Send + Sync>>,
58
59    /// Optional [TURN] server configuration.
60    ///
61    /// Enables [TURN] support on the provided [`Transport`]s, otherwise only
62    /// [STUN] ([RFC 5389]) will be enabled.
63    ///
64    /// [TURN]: https://en.wikipedia.org/wiki/TURN
65    /// [STUN]: https://en.wikipedia.org/wiki/STUN
66    /// [RFC 5389]: https://tools.ietf.org/html/rfc5389
67    pub turn: Option<TurnConfig<Auth>>,
68}
69
70/// [TURN] server configuration.
71///
72/// [TURN]: https://en.wikipedia.org/wiki/TURN
73#[derive(Debug)]
74pub struct TurnConfig<Auth> {
75    /// [`Allocator`] of [`relay`] connections.
76    ///
77    /// [`Allocator`]: relay::Allocator
78    pub relay_addr_generator: relay::Allocator,
79
80    /// [Realm][1] of the [`Server`].
81    ///
82    /// > A string used to describe the server or a context within the server.
83    /// > The realm tells the client which username and password combination to
84    /// > use to authenticate requests.
85    ///
86    /// [1]: https://tools.ietf.org/html/rfc5766#section-3
87    pub realm: String,
88
89    /// Callback for handling incoming authentication requests, allowing users
90    /// to customize it with custom behavior.
91    pub auth_handler: Arc<Auth>,
92
93    /// Lifetime of a [channel bindings][1].
94    ///
95    /// [1]: https://tools.ietf.org/html/rfc5766#section-2.5
96    pub channel_bind_lifetime: Duration,
97
98    /// [`mpsc::Sender`] receiving notify on [allocation][1] close event, along
99    /// with metrics data.
100    ///
101    /// [1]: https://tools.ietf.org/html/rfc5766#section-2.2
102    pub alloc_close_notify: Option<mpsc::Sender<Info>>,
103}
104
105// Manual implementation is provided to omit redundant `Auth: Clone` trait bound
106// imposed by the standard derive macro.
107impl<Auth> Clone for TurnConfig<Auth> {
108    fn clone(&self) -> Self {
109        Self {
110            relay_addr_generator: self.relay_addr_generator.clone(),
111            realm: self.realm.clone(),
112            auth_handler: Arc::clone(&self.auth_handler),
113            channel_bind_lifetime: self.channel_bind_lifetime,
114            alloc_close_notify: self.alloc_close_notify.clone(),
115        }
116    }
117}
118
119/// Instance of a [STUN]/[TURN] server.
120///
121/// [STUN]: https://en.wikipedia.org/wiki/STUN
122/// [TURN]: https://en.wikipedia.org/wiki/TURN
123#[derive(Debug)]
124pub struct Server {
125    /// [`broadcast::Sender`] to this [`Server`]'s internal loop.
126    command_tx: broadcast::Sender<Command>,
127
128    /// Long-running tasks driving the [`Server`].
129    ///
130    /// Used by the [`Server::healthz()`] check.
131    runners: Vec<JoinHandle<()>>,
132}
133
134impl Server {
135    /// Creates a new [`Server`] according to the provided [`Config`], and
136    /// [`spawn`]s its internal loop.
137    ///
138    /// [`spawn`]: tokio::spawn()
139    #[must_use]
140    pub fn new<A>(config: Config<A>) -> Self
141    where
142        A: AuthHandler + Send + Sync + 'static,
143    {
144        let (command_tx, _) = broadcast::channel(16);
145        let mut runners = Vec::with_capacity(config.connections.len());
146
147        for conn in config.connections {
148            let mut turn = config.turn.clone().map(TurnCtx::from);
149
150            let mut handle_rx = command_tx.subscribe();
151
152            let (mut close_tx, mut close_rx) = oneshot::channel::<()>();
153            runners.push(tokio::spawn(async move {
154                let local_con_addr = conn.local_addr();
155                let protocol = conn.proto();
156
157                loop {
158                    let (msg, src_addr) = tokio::select! {
159                        cmd = handle_rx.recv() => {
160                            match cmd {
161                                Ok(Command::DeleteAllocations(
162                                    name,
163                                    completion,
164                                )) => {
165                                    let Some(turn) = &mut turn else {
166                                        continue;
167                                    };
168                                    turn.alloc
169                                        .delete_allocations_by_username(&name);
170                                    drop(completion);
171                                }
172                                Ok(Command::GetAllocationsInfo(
173                                    five_tuples,
174                                    tx,
175                                )) => {
176                                    let Some(turn) = &mut turn else {
177                                        drop(tx.send(HashMap::new()));
178                                        continue;
179                                    };
180                                    let infos =
181                                        turn.alloc.get_allocations_info(
182                                            five_tuples.as_ref()
183                                        );
184                                    drop(tx.send(infos).await);
185                                }
186                                Err(RecvError::Closed) => {
187                                    close_rx.close();
188                                    break;
189                                }
190                                Err(RecvError::Lagged(n)) => {
191                                    log::warn!(
192                                        "`Server` has lagged by {n} messages",
193                                    );
194                                }
195                            }
196                            continue;
197                        },
198                        v = conn.recv_from() => {
199                            match v {
200                                Ok(v) => v,
201                                Err(e) if e.is_fatal() => {
202                                    log::error!(
203                                        "Exit `Server` read loop on transport \
204                                         recv error: {e}",
205                                    );
206                                    break;
207                                }
208                                Err(e) => {
209                                    log::debug!("`Request` parse error: {e}");
210                                    continue;
211                                }
212                            }
213                        },
214                        () = close_tx.closed() => break
215                    };
216
217                    let handle = request::handle(
218                        msg,
219                        &conn,
220                        FiveTuple {
221                            src_addr,
222                            dst_addr: local_con_addr,
223                            protocol,
224                        },
225                        &mut turn,
226                    );
227                    if let Err(e) = handle.await {
228                        log::warn!("Error when handling `Request`: {e}");
229                    }
230                }
231            }));
232        }
233
234        Self { command_tx, runners }
235    }
236
237    /// Deletes all existing [allocations][1] with the provided `username`.
238    ///
239    /// # Errors
240    ///
241    /// With an [`Error::Closed`] if the [`Server`] was closed already.
242    ///
243    /// [1]: https://tools.ietf.org/html/rfc5766#section-2.2
244    pub async fn delete_allocations_by_username(
245        &self,
246        username: String,
247    ) -> Result<(), Error> {
248        let (closed_tx, closed_rx) = mpsc::channel(1);
249        #[expect(clippy::map_err_ignore, reason = "only errors on closing")]
250        let _: usize = self
251            .command_tx
252            .send(Command::DeleteAllocations(username, Arc::new(closed_rx)))
253            .map_err(|_| Error::Closed)?;
254
255        closed_tx.closed().await;
256
257        Ok(())
258    }
259
260    /// Returns [`Info`]s for the provided [`FiveTuple`]s.
261    ///
262    /// If `five_tuples` is:
263    /// - [`None`]:               It returns information about the all
264    ///   allocations.
265    /// - [`Some`] and not empty: It returns information about the allocations
266    ///   associated with the specified [`FiveTuple`]s.
267    /// - [`Some`], but empty:    It returns an empty [`HashMap`].
268    ///
269    /// # Errors
270    ///
271    /// With an [`Error::Closed`] if the [`Server`] was closed already.
272    pub async fn get_allocations_info(
273        &self,
274        five_tuples: Option<Vec<FiveTuple>>,
275    ) -> Result<HashMap<FiveTuple, Info>, Error> {
276        if let Some(five_tuples) = &five_tuples {
277            if five_tuples.is_empty() {
278                return Ok(HashMap::new());
279            }
280        }
281
282        let (infos_tx, mut infos_rx) = mpsc::channel(1);
283
284        #[expect(clippy::map_err_ignore, reason = "only errors on closing")]
285        let _: usize = self
286            .command_tx
287            .send(Command::GetAllocationsInfo(five_tuples, infos_tx))
288            .map_err(|_| Error::Closed)?;
289
290        let mut info: HashMap<FiveTuple, Info> = HashMap::new();
291        for _ in 0..self.command_tx.receiver_count() {
292            info.extend(infos_rx.recv().await.ok_or(Error::Closed)?);
293        }
294        Ok(info)
295    }
296
297    /// Checks healthiness of this [`Server`] based on whether all the
298    /// initialized long-running transport loops are still running.
299    #[must_use]
300    pub fn healthz(&self) -> bool {
301        !self.runners.iter().any(JoinHandle::is_finished)
302    }
303}
304
305/// Commands for communication between [`Server`]'s public methods and the tasks
306/// spawned in its inner loop.
307#[derive(Clone)]
308enum Command {
309    /// Delete [`Allocation`] by the provided `username`.
310    DeleteAllocations(String, Arc<mpsc::Receiver<()>>),
311
312    /// Return information about [`Allocation`] for the provided [`FiveTuple`]s.
313    GetAllocationsInfo(
314        Option<Vec<FiveTuple>>,
315        mpsc::Sender<HashMap<FiveTuple, Info>>,
316    ),
317}
318
319/// Indication whether an [`Error`] is fatal.
320///
321/// [`Error`]: std::error::Error
322trait FatalError {
323    /// Indicates whether this [`Error`] is fatal.
324    ///
325    /// [`Error`]: std::error::Error
326    fn is_fatal(&self) -> bool;
327}
328
329impl FatalError for transport::Error {
330    fn is_fatal(&self) -> bool {
331        match self {
332            Self::Io(_) | Self::TransportIsDead => true,
333            Self::ChannelData(_) | Self::Decode(_) | Self::Encode(_) => false,
334        }
335    }
336}