medea_turn/server/mod.rs
1//! [STUN]/[TURN] server implementation.
2//!
3//! [STUN]: https://en.wikipedia.org/wiki/STUN
4//! [TURN]: https://en.wikipedia.org/wiki/TURN
5
6mod request;
7
8use std::{collections::HashMap, sync::Arc};
9
10use derive_more::with_trait::Debug;
11use tokio::{
12 sync::{
13 broadcast::{
14 error::RecvError,
15 {self},
16 },
17 mpsc, oneshot,
18 },
19 task::JoinHandle,
20 time::Duration,
21};
22
23#[cfg(doc)]
24use crate::allocation::Allocation;
25use crate::{
26 AuthHandler, Error,
27 allocation::{FiveTuple, Info},
28 relay,
29 server::request::TurnCtx,
30 transport::{self, Transport},
31};
32
33/// Default lifetime of an [allocation][1] (10 minutes) as defined in
34/// [RFC 5766 Section 2.2][1].
35///
36/// [1]: https://tools.ietf.org/html/rfc5766#section-2.2
37pub(crate) const DEFAULT_LIFETIME: Duration = Duration::from_secs(10 * 60);
38
39/// [MTU] of UDP connections.
40///
41/// [MTU]: https://en.wikipedia.org/wiki/Maximum_transmission_unit
42pub(crate) const INBOUND_MTU: usize = 1500;
43
44/// Configuration of a [`Server`].
45#[derive(Debug)]
46pub struct Config<Auth> {
47 /// List of all [STUN]/[TURN] connections listeners.
48 ///
49 /// Each listener may have a custom behavior around the creation of
50 /// [`relay`]s.
51 ///
52 /// [STUN]: https://en.wikipedia.org/wiki/STUN
53 /// [TURN]: https://en.wikipedia.org/wiki/TURN
54 #[debug("{:?}", connections.iter()
55 .map(|c| (c.local_addr(), c.proto()))
56 .collect::<Vec<_>>())]
57 pub connections: Vec<Arc<dyn Transport + Send + Sync>>,
58
59 /// Optional [TURN] server configuration.
60 ///
61 /// Enables [TURN] support on the provided [`Transport`]s, otherwise only
62 /// [STUN] ([RFC 5389]) will be enabled.
63 ///
64 /// [TURN]: https://en.wikipedia.org/wiki/TURN
65 /// [STUN]: https://en.wikipedia.org/wiki/STUN
66 /// [RFC 5389]: https://tools.ietf.org/html/rfc5389
67 pub turn: Option<TurnConfig<Auth>>,
68}
69
70/// [TURN] server configuration.
71///
72/// [TURN]: https://en.wikipedia.org/wiki/TURN
73#[derive(Debug)]
74pub struct TurnConfig<Auth> {
75 /// [`Allocator`] of [`relay`] connections.
76 ///
77 /// [`Allocator`]: relay::Allocator
78 pub relay_addr_generator: relay::Allocator,
79
80 /// [Realm][1] of the [`Server`].
81 ///
82 /// > A string used to describe the server or a context within the server.
83 /// > The realm tells the client which username and password combination to
84 /// > use to authenticate requests.
85 ///
86 /// [1]: https://tools.ietf.org/html/rfc5766#section-3
87 pub realm: String,
88
89 /// Callback for handling incoming authentication requests, allowing users
90 /// to customize it with custom behavior.
91 pub auth_handler: Arc<Auth>,
92
93 /// Lifetime of a [channel bindings][1].
94 ///
95 /// [1]: https://tools.ietf.org/html/rfc5766#section-2.5
96 pub channel_bind_lifetime: Duration,
97
98 /// [`mpsc::Sender`] receiving notify on [allocation][1] close event, along
99 /// with metrics data.
100 ///
101 /// [1]: https://tools.ietf.org/html/rfc5766#section-2.2
102 pub alloc_close_notify: Option<mpsc::Sender<Info>>,
103}
104
105// Manual implementation is provided to omit redundant `Auth: Clone` trait bound
106// imposed by the standard derive macro.
107impl<Auth> Clone for TurnConfig<Auth> {
108 fn clone(&self) -> Self {
109 Self {
110 relay_addr_generator: self.relay_addr_generator.clone(),
111 realm: self.realm.clone(),
112 auth_handler: Arc::clone(&self.auth_handler),
113 channel_bind_lifetime: self.channel_bind_lifetime,
114 alloc_close_notify: self.alloc_close_notify.clone(),
115 }
116 }
117}
118
119/// Instance of a [STUN]/[TURN] server.
120///
121/// [STUN]: https://en.wikipedia.org/wiki/STUN
122/// [TURN]: https://en.wikipedia.org/wiki/TURN
123#[derive(Debug)]
124pub struct Server {
125 /// [`broadcast::Sender`] to this [`Server`]'s internal loop.
126 command_tx: broadcast::Sender<Command>,
127
128 /// Long-running tasks driving the [`Server`].
129 ///
130 /// Used by the [`Server::healthz()`] check.
131 runners: Vec<JoinHandle<()>>,
132}
133
134impl Server {
135 /// Creates a new [`Server`] according to the provided [`Config`], and
136 /// [`spawn`]s its internal loop.
137 ///
138 /// [`spawn`]: tokio::spawn()
139 #[must_use]
140 pub fn new<A>(config: Config<A>) -> Self
141 where
142 A: AuthHandler + Send + Sync + 'static,
143 {
144 let (command_tx, _) = broadcast::channel(16);
145 let mut runners = Vec::with_capacity(config.connections.len());
146
147 for conn in config.connections {
148 let mut turn = config.turn.clone().map(TurnCtx::from);
149
150 let mut handle_rx = command_tx.subscribe();
151
152 let (mut close_tx, mut close_rx) = oneshot::channel::<()>();
153 runners.push(tokio::spawn(async move {
154 let local_con_addr = conn.local_addr();
155 let protocol = conn.proto();
156
157 loop {
158 let (msg, src_addr) = tokio::select! {
159 cmd = handle_rx.recv() => {
160 match cmd {
161 Ok(Command::DeleteAllocations(
162 name,
163 completion,
164 )) => {
165 let Some(turn) = &mut turn else {
166 continue;
167 };
168 turn.alloc
169 .delete_allocations_by_username(&name);
170 drop(completion);
171 }
172 Ok(Command::GetAllocationsInfo(
173 five_tuples,
174 tx,
175 )) => {
176 let Some(turn) = &mut turn else {
177 drop(tx.send(HashMap::new()));
178 continue;
179 };
180 let infos =
181 turn.alloc.get_allocations_info(
182 five_tuples.as_ref()
183 );
184 drop(tx.send(infos).await);
185 }
186 Err(RecvError::Closed) => {
187 close_rx.close();
188 break;
189 }
190 Err(RecvError::Lagged(n)) => {
191 log::warn!(
192 "`Server` has lagged by {n} messages",
193 );
194 }
195 }
196 continue;
197 },
198 v = conn.recv_from() => {
199 match v {
200 Ok(v) => v,
201 Err(e) if e.is_fatal() => {
202 log::error!(
203 "Exit `Server` read loop on transport \
204 recv error: {e}",
205 );
206 break;
207 }
208 Err(e) => {
209 log::debug!("`Request` parse error: {e}");
210 continue;
211 }
212 }
213 },
214 () = close_tx.closed() => break
215 };
216
217 let handle = request::handle(
218 msg,
219 &conn,
220 FiveTuple {
221 src_addr,
222 dst_addr: local_con_addr,
223 protocol,
224 },
225 &mut turn,
226 );
227 if let Err(e) = handle.await {
228 log::warn!("Error when handling `Request`: {e}");
229 }
230 }
231 }));
232 }
233
234 Self { command_tx, runners }
235 }
236
237 /// Deletes all existing [allocations][1] with the provided `username`.
238 ///
239 /// # Errors
240 ///
241 /// With an [`Error::Closed`] if the [`Server`] was closed already.
242 ///
243 /// [1]: https://tools.ietf.org/html/rfc5766#section-2.2
244 pub async fn delete_allocations_by_username(
245 &self,
246 username: String,
247 ) -> Result<(), Error> {
248 let (closed_tx, closed_rx) = mpsc::channel(1);
249 #[expect(clippy::map_err_ignore, reason = "only errors on closing")]
250 let _: usize = self
251 .command_tx
252 .send(Command::DeleteAllocations(username, Arc::new(closed_rx)))
253 .map_err(|_| Error::Closed)?;
254
255 closed_tx.closed().await;
256
257 Ok(())
258 }
259
260 /// Returns [`Info`]s for the provided [`FiveTuple`]s.
261 ///
262 /// If `five_tuples` is:
263 /// - [`None`]: It returns information about the all
264 /// allocations.
265 /// - [`Some`] and not empty: It returns information about the allocations
266 /// associated with the specified [`FiveTuple`]s.
267 /// - [`Some`], but empty: It returns an empty [`HashMap`].
268 ///
269 /// # Errors
270 ///
271 /// With an [`Error::Closed`] if the [`Server`] was closed already.
272 pub async fn get_allocations_info(
273 &self,
274 five_tuples: Option<Vec<FiveTuple>>,
275 ) -> Result<HashMap<FiveTuple, Info>, Error> {
276 if let Some(five_tuples) = &five_tuples {
277 if five_tuples.is_empty() {
278 return Ok(HashMap::new());
279 }
280 }
281
282 let (infos_tx, mut infos_rx) = mpsc::channel(1);
283
284 #[expect(clippy::map_err_ignore, reason = "only errors on closing")]
285 let _: usize = self
286 .command_tx
287 .send(Command::GetAllocationsInfo(five_tuples, infos_tx))
288 .map_err(|_| Error::Closed)?;
289
290 let mut info: HashMap<FiveTuple, Info> = HashMap::new();
291 for _ in 0..self.command_tx.receiver_count() {
292 info.extend(infos_rx.recv().await.ok_or(Error::Closed)?);
293 }
294 Ok(info)
295 }
296
297 /// Checks healthiness of this [`Server`] based on whether all the
298 /// initialized long-running transport loops are still running.
299 #[must_use]
300 pub fn healthz(&self) -> bool {
301 !self.runners.iter().any(JoinHandle::is_finished)
302 }
303}
304
305/// Commands for communication between [`Server`]'s public methods and the tasks
306/// spawned in its inner loop.
307#[derive(Clone)]
308enum Command {
309 /// Delete [`Allocation`] by the provided `username`.
310 DeleteAllocations(String, Arc<mpsc::Receiver<()>>),
311
312 /// Return information about [`Allocation`] for the provided [`FiveTuple`]s.
313 GetAllocationsInfo(
314 Option<Vec<FiveTuple>>,
315 mpsc::Sender<HashMap<FiveTuple, Info>>,
316 ),
317}
318
319/// Indication whether an [`Error`] is fatal.
320///
321/// [`Error`]: std::error::Error
322trait FatalError {
323 /// Indicates whether this [`Error`] is fatal.
324 ///
325 /// [`Error`]: std::error::Error
326 fn is_fatal(&self) -> bool;
327}
328
329impl FatalError for transport::Error {
330 fn is_fatal(&self) -> bool {
331 match self {
332 Self::Io(_) | Self::TransportIsDead => true,
333 Self::ChannelData(_) | Self::Decode(_) | Self::Encode(_) => false,
334 }
335 }
336}