turn_server/service/session/mod.rs
1pub mod ports;
2
3use super::{
4 ServiceHandler, Transport,
5 session::ports::{PortAllocator, PortRange},
6};
7
8use crate::codec::{crypto::Password, message::attributes::PasswordAlgorithm};
9
10use std::{
11 hash::Hash,
12 net::SocketAddr,
13 ops::{Deref, DerefMut},
14 sync::{
15 Arc,
16 atomic::{AtomicU64, Ordering},
17 },
18 thread::{self, sleep},
19 time::Duration,
20};
21
22use ahash::{HashMap, HashMapExt};
23use parking_lot::{Mutex, RwLock, RwLockReadGuard};
24use rand::{Rng, distr::Alphanumeric};
25
26/// The identifier of the session.
27///
28/// Each session needs to be identified by a combination of three pieces of
29/// information: the source address, and the transport protocol.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
31pub struct Identifier {
32 pub source: SocketAddr,
33 pub external: SocketAddr,
34 pub interface: SocketAddr,
35 pub transport: Transport,
36}
37
38/// The default HashMap is created without allocating capacity. To improve
39/// performance, the turn server needs to pre-allocate the available capacity.
40///
41/// So here the HashMap is rewrapped to allocate a large capacity (number of
42/// ports that can be allocated) at the default creation time as well.
43pub struct Table<K, V>(HashMap<K, V>);
44
45impl<K, V> Default for Table<K, V> {
46 fn default() -> Self {
47 Self(HashMap::with_capacity(PortRange::default().size()))
48 }
49}
50
51impl<K, V> AsRef<HashMap<K, V>> for Table<K, V> {
52 fn as_ref(&self) -> &HashMap<K, V> {
53 &self.0
54 }
55}
56
57impl<K, V> Deref for Table<K, V> {
58 type Target = HashMap<K, V>;
59
60 fn deref(&self) -> &Self::Target {
61 &self.0
62 }
63}
64
65impl<K, V> DerefMut for Table<K, V> {
66 fn deref_mut(&mut self) -> &mut Self::Target {
67 &mut self.0
68 }
69}
70
71/// Used to lengthen the timing of the release of a readable lock guard and to
72/// provide a more convenient way for external access to the lock's internal
73/// data.
74pub struct ReadLock<'a, 'b, K, R> {
75 pub key: &'a K,
76 pub lock: RwLockReadGuard<'b, R>,
77}
78
79impl<'a, 'b, K, V> ReadLock<'a, 'b, K, Table<K, V>>
80where
81 K: Eq + Hash,
82{
83 pub fn get_ref(&self) -> Option<&V> {
84 self.lock.get(self.key)
85 }
86}
87
88/// A specially optimised timer.
89///
90/// This timer does not stack automatically and needs to be stacked externally
91/// and manually.
92///
93/// ```
94/// use turn_server::service::session::Timer;
95///
96/// let timer = Timer::default();
97///
98/// assert_eq!(timer.get(), 0);
99/// assert_eq!(timer.add(), 1);
100/// assert_eq!(timer.get(), 1);
101/// ```
102#[derive(Default)]
103pub struct Timer(AtomicU64);
104
105impl Timer {
106 pub fn get(&self) -> u64 {
107 self.0.load(Ordering::Relaxed)
108 }
109
110 pub fn add(&self) -> u64 {
111 self.0.fetch_add(1, Ordering::Relaxed) + 1
112 }
113}
114
115/// Default session lifetime in seconds (10 minutes)
116pub const DEFAULT_SESSION_LIFETIME: u64 = 600;
117
118/// turn session information.
119///
120/// A user can have many sessions.
121///
122/// The default survival time for a session is 600 seconds.
123#[derive(Debug, Clone)]
124pub enum Session {
125 New {
126 nonce: String,
127 expires: u64,
128 },
129 Authenticated {
130 nonce: String,
131 /// Authentication information for the session.
132 ///
133 /// Digest data is data that summarises usernames and passwords by means of
134 /// long-term authentication.
135 username: String,
136 password: Password,
137 /// Assignment information for the session.
138 ///
139 /// SessionManager are all bound to only one port and one channel.
140 allocated_port: Option<u16>,
141 // Stores the address to which the session should be forwarded when it sends indication to a
142 // port. This is written when permissions are created to allow a certain address to be
143 // forwarded to the current session.
144 port_relay_table: HashMap</* port */ u16, Identifier>,
145 // Indicates to which session the data sent by a session to a channel should be forwarded.
146 channel_relay_table: HashMap</* channel */ u16, Identifier>,
147 expires: u64,
148 },
149}
150
151impl Session {
152 /// Get the nonce of the session.
153 pub fn nonce(&self) -> &str {
154 match self {
155 Session::New { nonce, .. } | Session::Authenticated { nonce, .. } => nonce,
156 }
157 }
158
159 /// Check if the session is a new session.
160 pub fn is_new(&self) -> bool {
161 matches!(self, Session::New { .. })
162 }
163
164 /// Check if the session is an authenticated session.
165 pub fn is_authenticated(&self) -> bool {
166 matches!(self, Session::Authenticated { .. })
167 }
168}
169
170pub struct SessionManagerOptions<T> {
171 pub port_range: PortRange,
172 pub handler: T,
173}
174
175pub struct SessionManager<T> {
176 sessions: RwLock<Table<Identifier, Session>>,
177 port_allocator: Mutex<PortAllocator>,
178 // Records the sessions corresponding to each assigned port, which will be needed when looking
179 // up sessions assigned to this port based on the port number.
180 port_mapping_table: RwLock<Table</* port */ u16, Identifier>>,
181 timer: Timer,
182 handler: T,
183}
184
185impl<T> SessionManager<T>
186where
187 T: ServiceHandler,
188{
189 pub fn new(options: SessionManagerOptions<T>) -> Arc<Self> {
190 let this = Arc::new(Self {
191 port_allocator: Mutex::new(PortAllocator::new(options.port_range)),
192 port_mapping_table: RwLock::new(Table::default()),
193 sessions: RwLock::new(Table::default()),
194 timer: Timer::default(),
195 handler: options.handler,
196 });
197
198 // This is a background thread that silently handles expiring sessions and
199 // cleans up session information when it expires.
200 let this_ = Arc::downgrade(&this);
201 thread::spawn(move || {
202 let mut identifiers = Vec::with_capacity(255);
203
204 while let Some(this) = this_.upgrade() {
205 // The timer advances one second and gets the current time offset.
206 let now = this.timer.add();
207
208 // This is the part that deletes the session information.
209 {
210 // Finds sessions that have expired.
211 {
212 this.sessions
213 .read()
214 .iter()
215 .filter(|(_, v)| match v {
216 Session::New { expires, .. }
217 | Session::Authenticated { expires, .. } => *expires <= now,
218 })
219 .for_each(|(k, _)| identifiers.push(*k));
220 }
221
222 // Delete the expired sessions.
223 if !identifiers.is_empty() {
224 this.remove_session(&identifiers);
225 identifiers.clear();
226 }
227 }
228
229 // Fixing a second tick.
230 sleep(Duration::from_secs(1));
231 }
232 });
233
234 this
235 }
236
237 fn remove_session(&self, identifiers: &[Identifier]) {
238 let mut sessions = self.sessions.write();
239 let mut port_allocator = self.port_allocator.lock();
240 let mut port_mapping_table = self.port_mapping_table.write();
241
242 identifiers.iter().for_each(|k| {
243 if let Some(Session::Authenticated {
244 allocated_port,
245 username,
246 ..
247 }) = sessions.remove(k)
248 {
249 // Removes the session-bound port from the port binding table and
250 // releases the port back into the allocation pool.
251 if let Some(port) = allocated_port {
252 port_mapping_table.remove(&port);
253 port_allocator.deallocate(port);
254 }
255
256 // Notifies that the external session has been closed.
257 self.handler.on_destroy(k, &username);
258 }
259 });
260 }
261
262 /// Get session for identifier.
263 ///
264 /// # Test
265 ///
266 /// ```
267 /// use turn_server::service::session::*;
268 /// use turn_server::service::*;
269 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
270 /// use turn_server::codec::crypto::Password;
271 /// use pollster::FutureExt;
272 ///
273 /// #[derive(Clone)]
274 /// struct ServiceHandlerTest;
275 ///
276 /// impl ServiceHandler for ServiceHandlerTest {
277 /// async fn get_password(&self, id: &Identifier, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
278 /// if username == "test" {
279 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
280 /// } else {
281 /// None
282 /// }
283 /// }
284 /// }
285 ///
286 /// let identifier = Identifier {
287 /// source: "127.0.0.1:8080".parse().unwrap(),
288 /// external: "127.0.0.1:3478".parse().unwrap(),
289 /// interface: "127.0.0.1:3478".parse().unwrap(),
290 /// transport: Transport::Udp,
291 /// };
292 ///
293 /// let digest = Password::Md5([
294 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
295 /// 239,
296 /// ]);
297 ///
298 /// let sessions = SessionManager::new(SessionManagerOptions {
299 /// port_range: (49152..65535).into(),
300 /// handler: ServiceHandlerTest,
301 /// });
302 ///
303 /// // get_session always creates a new session if it doesn't exist
304 /// {
305 /// assert!(sessions.get_session(&identifier).get_ref().is_none());
306 /// }
307 ///
308 /// // get_session always creates a new session if it doesn't exist
309 /// {
310 /// let lock = sessions.get_session_or_default(&identifier);
311 /// let session = lock.get_ref().unwrap();
312 /// match session {
313 /// Session::New { .. } => {},
314 /// _ => panic!("Expected new session"),
315 /// }
316 /// }
317 ///
318 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
319 ///
320 /// {
321 /// let lock = sessions.get_session(&identifier);
322 /// let session = lock.get_ref().unwrap();
323 /// match session {
324 /// Session::Authenticated { username, allocated_port, channel_relay_table, .. } => {
325 /// assert_eq!(username, "test");
326 /// assert_eq!(allocated_port, &None);
327 /// assert_eq!(channel_relay_table.len(), 0);
328 /// }
329 /// _ => panic!("Expected authenticated session"),
330 /// }
331 /// }
332 /// ```
333 pub fn get_session_or_default<'a, 'b>(
334 &'a self,
335 key: &'b Identifier,
336 ) -> ReadLock<'b, 'a, Identifier, Table<Identifier, Session>> {
337 {
338 let lock = self.sessions.read();
339
340 if lock.contains_key(key) {
341 return ReadLock { lock, key };
342 }
343 }
344
345 {
346 self.sessions.write().insert(
347 *key,
348 Session::New {
349 // A random string of length 16.
350 nonce: generate_nonce(),
351 // Current time stacks for DEFAULT_SESSION_LIFETIME seconds.
352 expires: self.timer.get() + DEFAULT_SESSION_LIFETIME,
353 },
354 );
355 }
356
357 ReadLock {
358 lock: self.sessions.read(),
359 key,
360 }
361 }
362
363 pub fn get_session<'a, 'b>(
364 &'a self,
365 key: &'b Identifier,
366 ) -> ReadLock<'b, 'a, Identifier, Table<Identifier, Session>> {
367 ReadLock {
368 lock: self.sessions.read(),
369 key,
370 }
371 }
372
373 /// Get digest for identifier.
374 ///
375 /// # Test
376 ///
377 /// ```
378 /// use turn_server::service::session::*;
379 /// use turn_server::service::*;
380 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
381 /// use turn_server::codec::crypto::Password;
382 /// use pollster::FutureExt;
383 ///
384 /// #[derive(Clone)]
385 /// struct ServiceHandlerTest;
386 ///
387 /// impl ServiceHandler for ServiceHandlerTest {
388 /// async fn get_password(&self, id: &Identifier, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
389 /// if username == "test" {
390 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
391 /// } else {
392 /// None
393 /// }
394 /// }
395 /// }
396 ///
397 /// let identifier = Identifier {
398 /// source: "127.0.0.1:8080".parse().unwrap(),
399 /// external: "127.0.0.1:3478".parse().unwrap(),
400 /// interface: "127.0.0.1:3478".parse().unwrap(),
401 /// transport: Transport::Udp,
402 /// };
403 ///
404 /// let digest = Password::Md5([
405 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
406 /// 239,
407 /// ]);
408 ///
409 /// let sessions = SessionManager::new(SessionManagerOptions {
410 /// port_range: (49152..65535).into(),
411 /// handler: ServiceHandlerTest,
412 /// });
413 ///
414 /// // First call get_session to create a new session
415 /// {
416 /// sessions.get_session(&identifier);
417 /// }
418 /// assert_eq!(pollster::block_on(sessions.get_password(&identifier, "test1", PasswordAlgorithm::Md5)), None);
419 ///
420 /// // Create a new session for the next test
421 /// {
422 /// sessions.get_session(&identifier);
423 /// }
424 /// assert_eq!(sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on(), Some(digest));
425 ///
426 /// // The third call should return cached digest
427 /// assert_eq!(sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on(), Some(digest));
428 /// ```
429 pub async fn get_password(
430 &self,
431 identifier: &Identifier,
432 username: &str,
433 algorithm: PasswordAlgorithm,
434 ) -> Option<Password> {
435 // Already authenticated, get the cached digest directly.
436 {
437 if let Some(Session::Authenticated { password, .. }) =
438 self.sessions.read().get(identifier)
439 {
440 return Some(*password);
441 }
442 }
443
444 // Get the current user's password from an external handler and create a
445 // digest.
446 let password = self
447 .handler
448 .get_password(identifier, username, algorithm)
449 .await?;
450
451 // Record a new session.
452 {
453 let mut lock = self.sessions.write();
454
455 let nonce = if let Some(Session::New { nonce, .. }) = lock.remove(identifier) {
456 nonce
457 } else {
458 generate_nonce()
459 };
460
461 lock.insert(
462 *identifier,
463 Session::Authenticated {
464 port_relay_table: HashMap::with_capacity(10),
465 channel_relay_table: HashMap::with_capacity(10),
466 expires: self.timer.get() + DEFAULT_SESSION_LIFETIME,
467 username: username.to_string(),
468 allocated_port: None,
469 password,
470 nonce,
471 },
472 );
473 }
474
475 Some(password)
476 }
477
478 pub fn allocated(&self) -> usize {
479 self.port_allocator.lock().len()
480 }
481
482 /// Assign a port number to the session.
483 ///
484 /// # Test
485 ///
486 /// ```
487 /// use turn_server::service::session::*;
488 /// use turn_server::service::*;
489 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
490 /// use turn_server::codec::crypto::Password;
491 /// use pollster::FutureExt;
492 ///
493 /// #[derive(Clone)]
494 /// struct ServiceHandlerTest;
495 ///
496 /// impl ServiceHandler for ServiceHandlerTest {
497 /// async fn get_password(&self, id: &Identifier, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
498 /// if username == "test" {
499 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
500 /// } else {
501 /// None
502 /// }
503 /// }
504 /// }
505 ///
506 /// let identifier = Identifier {
507 /// source: "127.0.0.1:8080".parse().unwrap(),
508 /// external: "127.0.0.1:3478".parse().unwrap(),
509 /// interface: "127.0.0.1:3478".parse().unwrap(),
510 /// transport: Transport::Udp,
511 /// };
512 ///
513 /// let digest = Password::Md5([
514 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
515 /// 239,
516 /// ]);
517 ///
518 /// let sessions = SessionManager::new(SessionManagerOptions {
519 /// port_range: (49152..65535).into(),
520 /// handler: ServiceHandlerTest,
521 /// });
522 ///
523 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
524 ///
525 /// {
526 /// let lock = sessions.get_session(&identifier);
527 /// let session = lock.get_ref().unwrap();
528 /// match session {
529 /// Session::Authenticated { username, allocated_port, channel_relay_table, .. } => {
530 /// assert_eq!(username, "test");
531 /// assert_eq!(allocated_port, &None);
532 /// assert_eq!(channel_relay_table.len(), 0);
533 /// }
534 /// _ => panic!("Expected authenticated session"),
535 /// }
536 /// }
537 ///
538 /// let port = sessions.allocate(&identifier, None).unwrap();
539 /// {
540 /// let lock = sessions.get_session(&identifier);
541 /// let session = lock.get_ref().unwrap();
542 /// match session {
543 /// Session::Authenticated { username, allocated_port, channel_relay_table, .. } => {
544 /// assert_eq!(username, "test");
545 /// assert_eq!(allocated_port, &Some(port));
546 /// assert_eq!(channel_relay_table.len(), 0);
547 /// }
548 /// _ => panic!("Expected authenticated session"),
549 /// }
550 /// }
551 ///
552 /// assert_eq!(sessions.allocate(&identifier, None), Some(port));
553 /// ```
554 pub fn allocate(&self, identifier: &Identifier, lifetime: Option<u32>) -> Option<u16> {
555 let mut lock = self.sessions.write();
556
557 if let Some(Session::Authenticated {
558 allocated_port,
559 expires,
560 ..
561 }) = lock.get_mut(identifier)
562 {
563 // If the port has already been allocated, re-allocation is not allowed.
564 if let Some(port) = allocated_port {
565 return Some(*port);
566 }
567
568 // Records the port assigned to the current session and resets the alive time.
569 let port = self.port_allocator.lock().allocate(None)?;
570 *allocated_port = Some(port);
571 *expires =
572 self.timer.get() + (lifetime.unwrap_or(DEFAULT_SESSION_LIFETIME as u32) as u64);
573
574 // Write the allocation port binding table.
575 self.port_mapping_table.write().insert(port, *identifier);
576 Some(port)
577 } else {
578 None
579 }
580 }
581
582 /// Create permission for session.
583 ///
584 /// # Test
585 ///
586 /// ```
587 /// use turn_server::service::session::*;
588 /// use turn_server::service::*;
589 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
590 /// use turn_server::codec::crypto::Password;
591 /// use pollster::FutureExt;
592 ///
593 /// #[derive(Clone)]
594 /// struct ServiceHandlerTest;
595 ///
596 /// impl ServiceHandler for ServiceHandlerTest {
597 /// async fn get_password(&self, id: &Identifier, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
598 /// if username == "test" {
599 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
600 /// } else {
601 /// None
602 /// }
603 /// }
604 /// }
605 ///
606 /// let identifier = Identifier {
607 /// source: "127.0.0.1:8080".parse().unwrap(),
608 /// external: "127.0.0.1:3478".parse().unwrap(),
609 /// interface: "127.0.0.1:3478".parse().unwrap(),
610 /// transport: Transport::Udp,
611 /// };
612 ///
613 /// let peer_identifier = Identifier {
614 /// source: "127.0.0.1:8081".parse().unwrap(),
615 /// external: "127.0.0.1:3478".parse().unwrap(),
616 /// interface: "127.0.0.1:3478".parse().unwrap(),
617 /// transport: Transport::Udp,
618 /// };
619 ///
620 /// let digest = Password::Md5([
621 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
622 /// 239,
623 /// ]);
624 ///
625 /// let sessions = SessionManager::new(SessionManagerOptions {
626 /// port_range: (49152..65535).into(),
627 /// handler: ServiceHandlerTest,
628 /// });
629 ///
630 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
631 /// sessions.get_password(&peer_identifier, "test", PasswordAlgorithm::Md5).block_on();
632 ///
633 /// let port = sessions.allocate(&identifier, None).unwrap();
634 /// let peer_port = sessions.allocate(&peer_identifier, None).unwrap();
635 ///
636 /// assert!(!sessions.create_permission(&identifier, &[port]));
637 /// assert!(sessions.create_permission(&identifier, &[peer_port]));
638 ///
639 /// assert!(!sessions.create_permission(&peer_identifier, &[peer_port]));
640 /// assert!(sessions.create_permission(&peer_identifier, &[port]));
641 /// ```
642 pub fn create_permission(&self, identifier: &Identifier, ports: &[u16]) -> bool {
643 // Finds information about the current session.
644 if let Some(Session::Authenticated {
645 allocated_port,
646 port_relay_table,
647 ..
648 }) = self.sessions.write().get_mut(identifier)
649 {
650 // The port number assigned to the current session.
651 let Some(local_port) = *allocated_port else {
652 return false;
653 };
654
655 // You cannot create permissions for yourself.
656 if ports.contains(&local_port) {
657 return false;
658 }
659
660 for port in ports {
661 if let Some(peer) = self.port_mapping_table.read().get(port) {
662 // Check if the current port is already occupied by another client.
663 if let Some(relay) = port_relay_table.get(&port)
664 && relay != peer
665 {
666 return false;
667 }
668
669 port_relay_table.insert(*port, *peer);
670 } else {
671 return false;
672 }
673 }
674
675 true
676 } else {
677 false
678 }
679 }
680
681 /// Binding a channel to the session.
682 ///
683 /// # Test
684 ///
685 /// ```
686 /// use turn_server::service::session::*;
687 /// use turn_server::service::*;
688 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
689 /// use turn_server::codec::crypto::Password;
690 /// use pollster::FutureExt;
691 ///
692 /// #[derive(Clone)]
693 /// struct ServiceHandlerTest;
694 ///
695 /// impl ServiceHandler for ServiceHandlerTest {
696 /// async fn get_password(&self, id: &Identifier, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
697 /// if username == "test" {
698 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
699 /// } else {
700 /// None
701 /// }
702 /// }
703 /// }
704 ///
705 /// let identifier = Identifier {
706 /// source: "127.0.0.1:8080".parse().unwrap(),
707 /// external: "127.0.0.1:3478".parse().unwrap(),
708 /// interface: "127.0.0.1:3478".parse().unwrap(),
709 /// transport: Transport::Udp,
710 /// };
711 ///
712 /// let peer_identifier = Identifier {
713 /// source: "127.0.0.1:8081".parse().unwrap(),
714 /// external: "127.0.0.1:3478".parse().unwrap(),
715 /// interface: "127.0.0.1:3478".parse().unwrap(),
716 /// transport: Transport::Udp,
717 /// };
718 ///
719 /// let digest = Password::Md5([
720 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
721 /// 239,
722 /// ]);
723 ///
724 /// let sessions = SessionManager::new(SessionManagerOptions {
725 /// port_range: (49152..65535).into(),
726 /// handler: ServiceHandlerTest,
727 /// });
728 ///
729 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
730 /// sessions.get_password(&peer_identifier, "test", PasswordAlgorithm::Md5).block_on();
731 ///
732 /// let port = sessions.allocate(&identifier, None).unwrap();
733 /// let peer_port = sessions.allocate(&peer_identifier, None).unwrap();
734 /// {
735 /// assert_eq!(
736 /// match sessions.get_session(&identifier).get_ref().unwrap() {
737 /// Session::Authenticated { channel_relay_table, .. } => channel_relay_table.len(),
738 /// _ => panic!("Expected authenticated session"),
739 /// },
740 /// 0
741 /// );
742 /// }
743 ///
744 /// {
745 /// assert_eq!(
746 /// match sessions.get_session(&peer_identifier).get_ref().unwrap() {
747 /// Session::Authenticated { channel_relay_table, .. } => channel_relay_table.len(),
748 /// _ => panic!("Expected authenticated session"),
749 /// },
750 /// 0
751 /// );
752 /// }
753 ///
754 /// assert!(sessions.bind_channel(&identifier, peer_port, 0x4000));
755 /// assert!(sessions.bind_channel(&peer_identifier, port, 0x4000));
756 ///
757 /// {
758 /// assert!(
759 /// match sessions.get_session(&identifier).get_ref().unwrap() {
760 /// Session::Authenticated { channel_relay_table, .. } => channel_relay_table.contains_key(&0x4000),
761 /// _ => panic!("Expected authenticated session"),
762 /// }
763 /// );
764 /// }
765 ///
766 /// {
767 /// assert!(
768 /// match sessions.get_session(&peer_identifier).get_ref().unwrap() {
769 /// Session::Authenticated { channel_relay_table, .. } => channel_relay_table.contains_key(&0x4000),
770 /// _ => panic!("Expected authenticated session"),
771 /// }
772 /// );
773 /// }
774 /// ```
775 pub fn bind_channel(&self, identifier: &Identifier, port: u16, channel: u16) -> bool {
776 // Records the channel used for the current session.
777 {
778 if let Some(Session::Authenticated {
779 channel_relay_table,
780 ..
781 }) = self.sessions.write().get_mut(identifier)
782 {
783 // Finds the address of the bound opposing port.
784 if let Some(peer) = self.port_mapping_table.read().get(&port) {
785 // Check if the current channel is already occupied by another client.
786 if let Some(relay) = channel_relay_table.get(&channel)
787 && relay != peer
788 {
789 return false;
790 }
791
792 // Create channel forwarding mapping relationships for peers.
793 channel_relay_table.insert(channel, *peer);
794 } else {
795 return false;
796 };
797 } else {
798 return false;
799 };
800 }
801
802 // Binding ports also creates permissions.
803 if !self.create_permission(identifier, &[port]) {
804 return false;
805 }
806
807 true
808 }
809
810 /// Gets the peer of the current session bound channel.
811 ///
812 /// # Test
813 ///
814 /// ```
815 /// use turn_server::service::session::*;
816 /// use turn_server::service::*;
817 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
818 /// use turn_server::codec::crypto::Password;
819 /// use pollster::FutureExt;
820 ///
821 /// #[derive(Clone)]
822 /// struct ServiceHandlerTest;
823 ///
824 /// impl ServiceHandler for ServiceHandlerTest {
825 /// async fn get_password(&self, id: &Identifier, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
826 /// if username == "test" {
827 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
828 /// } else {
829 /// None
830 /// }
831 /// }
832 /// }
833 ///
834 /// let endpoint = "127.0.0.1:3478".parse().unwrap();
835 /// let identifier = Identifier {
836 /// source: "127.0.0.1:8080".parse().unwrap(),
837 /// external: "127.0.0.1:3478".parse().unwrap(),
838 /// interface: "127.0.0.1:3478".parse().unwrap(),
839 /// transport: Transport::Udp,
840 /// };
841 ///
842 /// let peer_identifier = Identifier {
843 /// source: "127.0.0.1:8081".parse().unwrap(),
844 /// external: "127.0.0.1:3478".parse().unwrap(),
845 /// interface: "127.0.0.1:3478".parse().unwrap(),
846 /// transport: Transport::Udp,
847 /// };
848 ///
849 /// let digest = Password::Md5([
850 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
851 /// 239,
852 /// ]);
853 ///
854 /// let sessions = SessionManager::new(SessionManagerOptions {
855 /// port_range: (49152..65535).into(),
856 /// handler: ServiceHandlerTest,
857 /// });
858 ///
859 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
860 /// sessions.get_password(&peer_identifier, "test", PasswordAlgorithm::Md5).block_on();
861 ///
862 /// let port = sessions.allocate(&identifier, None).unwrap();
863 /// let peer_port = sessions.allocate(&peer_identifier, None).unwrap();
864 ///
865 /// assert!(sessions.bind_channel(&identifier, peer_port, 0x4000));
866 /// assert!(sessions.bind_channel(&peer_identifier, port, 0x4000));
867 /// assert_eq!(
868 /// sessions
869 /// .get_channel_relay_address(&identifier, 0x4000)
870 /// .unwrap()
871 /// .1
872 /// .interface,
873 /// endpoint
874 /// );
875 ///
876 /// assert_eq!(
877 /// sessions
878 /// .get_channel_relay_address(&peer_identifier, 0x4000)
879 /// .unwrap()
880 /// .1
881 /// .interface,
882 /// endpoint
883 /// );
884 /// ```
885 pub fn get_channel_relay_address(
886 &self,
887 identifier: &Identifier,
888 channel: u16,
889 ) -> Option<(/* peer channel */ u16, Identifier)> {
890 let session = self.sessions.read();
891
892 if let Session::Authenticated {
893 channel_relay_table,
894 ..
895 } = session.get(identifier)?
896 {
897 let peer = channel_relay_table.get(&channel)?;
898
899 if let Session::Authenticated {
900 channel_relay_table,
901 ..
902 } = session.get(peer)?
903 {
904 // Find the channel bound to the current client.
905 let (peer_channel, _) =
906 channel_relay_table.iter().find(|(_, v)| *v == identifier)?;
907
908 Some((*peer_channel, *peer))
909 } else {
910 None
911 }
912 } else {
913 None
914 }
915 }
916
917 /// Get the address of the port binding.
918 ///
919 /// # Test
920 ///
921 /// ```
922 /// use turn_server::service::session::*;
923 /// use turn_server::service::*;
924 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
925 /// use turn_server::codec::crypto::Password;
926 /// use pollster::FutureExt;
927 ///
928 /// #[derive(Clone)]
929 /// struct ServiceHandlerTest;
930 ///
931 /// impl ServiceHandler for ServiceHandlerTest {
932 /// async fn get_password(&self, id: &Identifier, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
933 /// if username == "test" {
934 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
935 /// } else {
936 /// None
937 /// }
938 /// }
939 /// }
940 ///
941 /// let endpoint = "127.0.0.1:3478".parse().unwrap();
942 /// let identifier = Identifier {
943 /// source: "127.0.0.1:8080".parse().unwrap(),
944 /// external: "127.0.0.1:3478".parse().unwrap(),
945 /// interface: "127.0.0.1:3478".parse().unwrap(),
946 /// transport: Transport::Udp,
947 /// };
948 ///
949 /// let peer_identifier = Identifier {
950 /// source: "127.0.0.1:8081".parse().unwrap(),
951 /// external: "127.0.0.1:3478".parse().unwrap(),
952 /// interface: "127.0.0.1:3478".parse().unwrap(),
953 /// transport: Transport::Udp,
954 /// };
955 ///
956 /// let digest = Password::Md5([
957 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
958 /// 239,
959 /// ]);
960 ///
961 /// let sessions = SessionManager::new(SessionManagerOptions {
962 /// port_range: (49152..65535).into(),
963 /// handler: ServiceHandlerTest,
964 /// });
965 ///
966 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
967 /// sessions.get_password(&peer_identifier, "test", PasswordAlgorithm::Md5).block_on();
968 ///
969 /// let port = sessions.allocate(&identifier, None).unwrap();
970 /// let peer_port = sessions.allocate(&peer_identifier, None).unwrap();
971 ///
972 /// assert!(sessions.create_permission(&identifier, &[peer_port]));
973 /// assert!(sessions.create_permission(&peer_identifier, &[port]));
974 ///
975 /// assert_eq!(
976 /// sessions
977 /// .get_port_relay_address(&identifier, peer_port)
978 /// .unwrap()
979 /// .1
980 /// .interface,
981 /// endpoint
982 /// );
983 ///
984 /// assert_eq!(
985 /// sessions
986 /// .get_port_relay_address(&peer_identifier, port)
987 /// .unwrap()
988 /// .1
989 /// .interface,
990 /// endpoint
991 /// );
992 /// ```
993 pub fn get_port_relay_address(
994 &self,
995 identifier: &Identifier,
996 port: u16,
997 ) -> Option<(/* local port */ u16, Identifier)> {
998 if let Session::Authenticated {
999 port_relay_table,
1000 allocated_port,
1001 ..
1002 } = self.sessions.read().get(identifier)?
1003 {
1004 Some(((*allocated_port)?, port_relay_table.get(&port).copied()?))
1005 } else {
1006 None
1007 }
1008 }
1009
1010 /// Refresh the session for identifier.
1011 ///
1012 /// # Test
1013 ///
1014 /// ```
1015 /// use turn_server::service::session::*;
1016 /// use turn_server::service::*;
1017 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
1018 /// use turn_server::codec::crypto::Password;
1019 /// use pollster::FutureExt;
1020 ///
1021 /// #[derive(Clone)]
1022 /// struct ServiceHandlerTest;
1023 ///
1024 /// impl ServiceHandler for ServiceHandlerTest {
1025 /// async fn get_password(&self, id: &Identifier, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
1026 /// if username == "test" {
1027 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
1028 /// } else {
1029 /// None
1030 /// }
1031 /// }
1032 /// }
1033 ///
1034 /// let identifier = Identifier {
1035 /// source: "127.0.0.1:8080".parse().unwrap(),
1036 /// external: "127.0.0.1:3478".parse().unwrap(),
1037 /// interface: "127.0.0.1:3478".parse().unwrap(),
1038 /// transport: Transport::Udp,
1039 /// };
1040 ///
1041 /// let digest = Password::Md5([
1042 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
1043 /// 239,
1044 /// ]);
1045 ///
1046 /// let sessions = SessionManager::new(SessionManagerOptions {
1047 /// port_range: (49152..65535).into(),
1048 /// handler: ServiceHandlerTest,
1049 /// });
1050 ///
1051 /// // get_session always creates a new session if it doesn't exist
1052 /// {
1053 /// let lock = sessions.get_session_or_default(&identifier);
1054 /// let session = lock.get_ref().unwrap();
1055 /// match session {
1056 /// Session::New { .. } => {},
1057 /// _ => panic!("Expected new session"),
1058 /// }
1059 /// }
1060 ///
1061 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
1062 ///
1063 /// {
1064 /// let lock = sessions.get_session(&identifier);
1065 /// let expires = match lock.get_ref().unwrap() {
1066 /// Session::Authenticated { expires, .. } => *expires,
1067 /// _ => panic!("Expected authenticated session"),
1068 /// };
1069 ///
1070 /// assert!(expires == 600 || expires == 601 || expires == 602);
1071 /// }
1072 ///
1073 /// assert!(sessions.refresh(&identifier, 0));
1074 ///
1075 /// // After refresh with lifetime 0, session should be removed
1076 /// {
1077 /// assert!(sessions.get_session(&identifier).get_ref().is_none());
1078 /// }
1079 /// ```
1080 pub fn refresh(&self, identifier: &Identifier, lifetime: u32) -> bool {
1081 if lifetime > 3600 {
1082 return false;
1083 }
1084
1085 if lifetime == 0 {
1086 self.remove_session(&[*identifier]);
1087 } else if let Some(Session::Authenticated { expires, .. }) =
1088 self.sessions.write().get_mut(identifier)
1089 {
1090 *expires = self.timer.get() + lifetime as u64;
1091 } else {
1092 return false;
1093 }
1094
1095 true
1096 }
1097}
1098
1099/// Generate a cryptographically random nonce for STUN/TURN authentication.
1100///
1101/// The nonce is a critical security component in STUN/TURN's long-term credential
1102/// mechanism (RFC 5389). It serves multiple purposes:
1103/// - Prevents replay attacks by ensuring each authentication is unique
1104/// - Acts as a server-issued challenge in the digest authentication flow
1105/// - Binds authentication attempts to specific sessions
1106///
1107/// This implementation generates a 16-character alphanumeric string using a
1108/// cryptographically secure random number generator. The length is chosen to
1109/// provide sufficient entropy (approximately 95 bits) to make brute-force
1110/// attacks computationally infeasible while remaining well under the RFC's
1111/// 128-character limit.
1112///
1113/// # Returns
1114/// A random 16-character string containing alphanumeric characters [a-zA-Z0-9].
1115///
1116/// # Security
1117/// Uses `rand::rng()` which provides cryptographic-quality randomness suitable
1118/// for security-sensitive operations. See RFC 7616 Section 5.4 for additional
1119/// guidance on nonce value selection.
1120fn generate_nonce() -> String {
1121 rand::rng()
1122 .sample_iter(&Alphanumeric)
1123 .take(16)
1124 .map(char::from)
1125 .collect()
1126}