turn_server/service/session/mod.rs
1pub mod ports;
2
3use super::{
4 ServiceHandler,
5 session::ports::{PortAllocator, PortRange},
6};
7
8use crate::codec::{crypto::Password, message::attributes::PasswordAlgorithm};
9
10use std::{
11 hash::Hash,
12 net::SocketAddr,
13 ops::{Deref, DerefMut},
14 sync::{
15 Arc,
16 atomic::{AtomicU64, Ordering},
17 },
18 thread::{self, sleep},
19 time::Duration,
20};
21
22use ahash::{HashMap, HashMapExt};
23use parking_lot::{Mutex, RwLock, RwLockReadGuard};
24use rand::{Rng, distr::Alphanumeric};
25
26/// The identifier of the session.
27///
28/// Each session needs to be identified by a combination of three pieces of
29/// information: the source address, and the transport protocol.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
31pub struct Identifier {
32 source: SocketAddr,
33 interface: SocketAddr,
34}
35
36impl Identifier {
37 #[inline]
38 pub fn new(source: SocketAddr, interface: SocketAddr) -> Self {
39 Self { source, interface }
40 }
41
42 #[inline]
43 pub fn source(&self) -> SocketAddr {
44 self.source
45 }
46
47 #[inline]
48 pub fn interface(&self) -> SocketAddr {
49 self.interface
50 }
51
52 #[inline]
53 pub fn source_mut(&mut self) -> &mut SocketAddr {
54 &mut self.source
55 }
56
57 #[inline]
58 pub fn interface_mut(&mut self) -> &mut SocketAddr {
59 &mut self.interface
60 }
61}
62
63/// The endpoint used to record the current session.
64///
65/// This is used when forwarding data.
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
67pub struct Endpoint {
68 source: SocketAddr,
69 endpoint: SocketAddr,
70}
71
72impl Endpoint {
73 #[inline]
74 pub fn new(source: SocketAddr, endpoint: SocketAddr) -> Self {
75 Self { source, endpoint }
76 }
77
78 #[inline]
79 pub fn source(&self) -> SocketAddr {
80 self.source
81 }
82
83 #[inline]
84 pub fn endpoint(&self) -> SocketAddr {
85 self.endpoint
86 }
87
88 #[inline]
89 pub fn source_mut(&mut self) -> &mut SocketAddr {
90 &mut self.source
91 }
92
93 #[inline]
94 pub fn endpoint_mut(&mut self) -> &mut SocketAddr {
95 &mut self.endpoint
96 }
97}
98
99/// The default HashMap is created without allocating capacity. To improve
100/// performance, the turn server needs to pre-allocate the available capacity.
101///
102/// So here the HashMap is rewrapped to allocate a large capacity (number of
103/// ports that can be allocated) at the default creation time as well.
104pub struct Table<K, V>(HashMap<K, V>);
105
106impl<K, V> Default for Table<K, V> {
107 fn default() -> Self {
108 Self(HashMap::with_capacity(PortRange::default().size()))
109 }
110}
111
112impl<K, V> AsRef<HashMap<K, V>> for Table<K, V> {
113 fn as_ref(&self) -> &HashMap<K, V> {
114 &self.0
115 }
116}
117
118impl<K, V> Deref for Table<K, V> {
119 type Target = HashMap<K, V>;
120
121 fn deref(&self) -> &Self::Target {
122 &self.0
123 }
124}
125
126impl<K, V> DerefMut for Table<K, V> {
127 fn deref_mut(&mut self) -> &mut Self::Target {
128 &mut self.0
129 }
130}
131
132/// Used to lengthen the timing of the release of a readable lock guard and to
133/// provide a more convenient way for external access to the lock's internal
134/// data.
135pub struct ReadLock<'a, 'b, K, R> {
136 pub key: &'a K,
137 pub lock: RwLockReadGuard<'b, R>,
138}
139
140impl<'a, 'b, K, V> ReadLock<'a, 'b, K, Table<K, V>>
141where
142 K: Eq + Hash,
143{
144 pub fn get_ref(&self) -> Option<&V> {
145 self.lock.get(self.key)
146 }
147}
148
149/// A specially optimised timer.
150///
151/// This timer does not stack automatically and needs to be stacked externally
152/// and manually.
153///
154/// ```
155/// use turn_server::service::session::Timer;
156///
157/// let timer = Timer::default();
158///
159/// assert_eq!(timer.get(), 0);
160/// assert_eq!(timer.add(), 1);
161/// assert_eq!(timer.get(), 1);
162/// ```
163#[derive(Default)]
164pub struct Timer(AtomicU64);
165
166impl Timer {
167 pub fn get(&self) -> u64 {
168 self.0.load(Ordering::Relaxed)
169 }
170
171 pub fn add(&self) -> u64 {
172 self.0.fetch_add(1, Ordering::Relaxed) + 1
173 }
174}
175
176/// Default session lifetime in seconds (10 minutes)
177pub const DEFAULT_SESSION_LIFETIME: u64 = 600;
178
179/// turn session information.
180///
181/// A user can have many sessions.
182///
183/// The default survival time for a session is 600 seconds.
184#[derive(Debug, Clone)]
185pub enum Session {
186 New {
187 nonce: String,
188 expires: u64,
189 },
190 Authenticated {
191 nonce: String,
192 /// Authentication information for the session.
193 ///
194 /// Digest data is data that summarises usernames and passwords by means of
195 /// long-term authentication.
196 username: String,
197 password: Password,
198 /// Assignment information for the session.
199 ///
200 /// SessionManager are all bound to only one port and one channel.
201 allocate_port: Option<u16>,
202 allocate_channels: Vec<u16>,
203 permissions: Vec<u16>,
204 expires: u64,
205 },
206}
207
208impl Session {
209 /// Get the nonce of the session.
210 pub fn nonce(&self) -> &str {
211 match self {
212 Session::New { nonce, .. } | Session::Authenticated { nonce, .. } => nonce,
213 }
214 }
215
216 /// Check if the session is a new session.
217 pub fn is_new(&self) -> bool {
218 matches!(self, Session::New { .. })
219 }
220
221 /// Check if the session is an authenticated session.
222 pub fn is_authenticated(&self) -> bool {
223 matches!(self, Session::Authenticated { .. })
224 }
225}
226
227pub struct SessionManagerOptions<T> {
228 pub port_range: PortRange,
229 pub handler: T,
230}
231
232pub struct SessionManager<T> {
233 sessions: RwLock<Table<Identifier, Session>>,
234 port_allocator: Mutex<PortAllocator>,
235 // Records the sessions corresponding to each assigned port, which will be needed when looking
236 // up sessions assigned to this port based on the port number.
237 port_mapping_table: RwLock<Table</* port */ u16, Identifier>>,
238 // Stores the address to which the session should be forwarded when it sends indication to a
239 // port. This is written when permissions are created to allow a certain address to be
240 // forwarded to the current session.
241 port_relay_table: RwLock<Table<Identifier, HashMap</* port */ u16, Endpoint>>>,
242 // Indicates to which session the data sent by a session to a channel should be forwarded.
243 channel_relay_table: RwLock<Table<Identifier, HashMap</* channel */ u16, Endpoint>>>,
244 timer: Timer,
245 handler: T,
246}
247
248impl<T> SessionManager<T>
249where
250 T: ServiceHandler,
251{
252 pub fn new(options: SessionManagerOptions<T>) -> Arc<Self> {
253 let this = Arc::new(Self {
254 port_allocator: Mutex::new(PortAllocator::new(options.port_range)),
255 channel_relay_table: RwLock::new(Table::default()),
256 port_mapping_table: RwLock::new(Table::default()),
257 port_relay_table: RwLock::new(Table::default()),
258 sessions: RwLock::new(Table::default()),
259 timer: Timer::default(),
260 handler: options.handler,
261 });
262
263 // This is a background thread that silently handles expiring sessions and
264 // cleans up session information when it expires.
265 let this_ = Arc::downgrade(&this);
266 thread::spawn(move || {
267 let mut identifiers = Vec::with_capacity(255);
268
269 while let Some(this) = this_.upgrade() {
270 // The timer advances one second and gets the current time offset.
271 let now = this.timer.add();
272
273 // This is the part that deletes the session information.
274 {
275 // Finds sessions that have expired.
276 {
277 this.sessions
278 .read()
279 .iter()
280 .filter(|(_, v)| match v {
281 Session::New { expires, .. }
282 | Session::Authenticated { expires, .. } => *expires <= now,
283 })
284 .for_each(|(k, _)| identifiers.push(*k));
285 }
286
287 // Delete the expired sessions.
288 if !identifiers.is_empty() {
289 this.remove_session(&identifiers);
290 identifiers.clear();
291 }
292 }
293
294 // Fixing a second tick.
295 sleep(Duration::from_secs(1));
296 }
297 });
298
299 this
300 }
301
302 fn remove_session(&self, identifiers: &[Identifier]) {
303 let mut sessions = self.sessions.write();
304 let mut port_allocator = self.port_allocator.lock();
305 let mut port_mapping_table = self.port_mapping_table.write();
306 let mut port_relay_table = self.port_relay_table.write();
307 let mut channel_relay_table = self.channel_relay_table.write();
308
309 identifiers.iter().for_each(|k| {
310 port_relay_table.remove(k);
311 channel_relay_table.remove(k);
312
313 if let Some(Session::Authenticated {
314 allocate_port,
315 username,
316 ..
317 }) = sessions.remove(k)
318 {
319 // Removes the session-bound port from the port binding table and
320 // releases the port back into the allocation pool.
321 if let Some(port) = allocate_port {
322 port_mapping_table.remove(&port);
323 port_allocator.deallocate(port);
324 }
325
326 // Notifies that the external session has been closed.
327 self.handler.on_destroy(k, &username);
328 }
329 });
330 }
331
332 /// Get session for identifier.
333 ///
334 /// # Test
335 ///
336 /// ```
337 /// use turn_server::service::session::*;
338 /// use turn_server::service::*;
339 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
340 /// use turn_server::codec::crypto::Password;
341 /// use pollster::FutureExt;
342 ///
343 /// #[derive(Clone)]
344 /// struct ServiceHandlerTest;
345 ///
346 /// impl ServiceHandler for ServiceHandlerTest {
347 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
348 /// if username == "test" {
349 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
350 /// } else {
351 /// None
352 /// }
353 /// }
354 /// }
355 ///
356 /// let identifier = Identifier::new(
357 /// "127.0.0.1:8080".parse().unwrap(),
358 /// "127.0.0.1:3478".parse().unwrap(),
359 /// );
360 ///
361 /// let digest = Password::Md5([
362 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
363 /// 239,
364 /// ]);
365 ///
366 /// let sessions = SessionManager::new(SessionManagerOptions {
367 /// port_range: (49152..65535).into(),
368 /// handler: ServiceHandlerTest,
369 /// });
370 ///
371 /// // get_session always creates a new session if it doesn't exist
372 /// {
373 /// assert!(sessions.get_session(&identifier).get_ref().is_none());
374 /// }
375 ///
376 /// // get_session always creates a new session if it doesn't exist
377 /// {
378 /// let lock = sessions.get_session_or_default(&identifier);
379 /// let session = lock.get_ref().unwrap();
380 /// match session {
381 /// Session::New { .. } => {},
382 /// _ => panic!("Expected new session"),
383 /// }
384 /// }
385 ///
386 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
387 ///
388 /// {
389 /// let lock = sessions.get_session(&identifier);
390 /// let session = lock.get_ref().unwrap();
391 /// match session {
392 /// Session::Authenticated { username, allocate_port, allocate_channels, .. } => {
393 /// assert_eq!(username, "test");
394 /// assert_eq!(allocate_port, &None);
395 /// assert_eq!(allocate_channels.len(), 0);
396 /// }
397 /// _ => panic!("Expected authenticated session"),
398 /// }
399 /// }
400 /// ```
401 pub fn get_session_or_default<'a, 'b>(
402 &'a self,
403 key: &'b Identifier,
404 ) -> ReadLock<'b, 'a, Identifier, Table<Identifier, Session>> {
405 {
406 let lock = self.sessions.read();
407 if lock.contains_key(key) {
408 return ReadLock {
409 lock: self.sessions.read(),
410 key,
411 };
412 }
413 }
414
415 {
416 self.sessions.write().insert(
417 *key,
418 Session::New {
419 // A random string of length 16.
420 nonce: generate_nonce(),
421 // Current time stacks for DEFAULT_SESSION_LIFETIME seconds.
422 expires: self.timer.get() + DEFAULT_SESSION_LIFETIME,
423 },
424 );
425 }
426
427 ReadLock {
428 lock: self.sessions.read(),
429 key,
430 }
431 }
432
433 pub fn get_session<'a, 'b>(
434 &'a self,
435 key: &'b Identifier,
436 ) -> ReadLock<'b, 'a, Identifier, Table<Identifier, Session>> {
437 ReadLock {
438 lock: self.sessions.read(),
439 key,
440 }
441 }
442
443 /// Get digest for identifier.
444 ///
445 /// # Test
446 ///
447 /// ```
448 /// use turn_server::service::session::*;
449 /// use turn_server::service::*;
450 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
451 /// use turn_server::codec::crypto::Password;
452 /// use pollster::FutureExt;
453 ///
454 /// #[derive(Clone)]
455 /// struct ServiceHandlerTest;
456 ///
457 /// impl ServiceHandler for ServiceHandlerTest {
458 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
459 /// if username == "test" {
460 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
461 /// } else {
462 /// None
463 /// }
464 /// }
465 /// }
466 ///
467 /// let identifier = Identifier::new(
468 /// "127.0.0.1:8080".parse().unwrap(),
469 /// "127.0.0.1:3478".parse().unwrap(),
470 /// );
471 ///
472 /// let digest = Password::Md5([
473 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
474 /// 239,
475 /// ]);
476 ///
477 /// let sessions = SessionManager::new(SessionManagerOptions {
478 /// port_range: (49152..65535).into(),
479 /// handler: ServiceHandlerTest,
480 /// });
481 ///
482 /// // First call get_session to create a new session
483 /// {
484 /// sessions.get_session(&identifier);
485 /// }
486 /// assert_eq!(pollster::block_on(sessions.get_password(&identifier, "test1", PasswordAlgorithm::Md5)), None);
487 ///
488 /// // Create a new session for the next test
489 /// {
490 /// sessions.get_session(&identifier);
491 /// }
492 /// assert_eq!(sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on(), Some(digest));
493 ///
494 /// // The third call should return cached digest
495 /// assert_eq!(sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on(), Some(digest));
496 /// ```
497 pub async fn get_password(
498 &self,
499 identifier: &Identifier,
500 username: &str,
501 algorithm: PasswordAlgorithm,
502 ) -> Option<Password> {
503 // Already authenticated, get the cached digest directly.
504 {
505 if let Some(Session::Authenticated { password, .. }) =
506 self.sessions.read().get(identifier)
507 {
508 return Some(*password);
509 }
510 }
511
512 // Get the current user's password from an external handler and create a
513 // digest.
514 let password = self.handler.get_password(username, algorithm).await?;
515
516 // Record a new session.
517 {
518 let mut lock = self.sessions.write();
519 let nonce = if let Some(Session::New { nonce, .. }) = lock.remove(identifier) {
520 nonce
521 } else {
522 generate_nonce()
523 };
524
525 lock.insert(
526 *identifier,
527 Session::Authenticated {
528 allocate_channels: Vec::with_capacity(10),
529 permissions: Vec::with_capacity(10),
530 expires: self.timer.get() + DEFAULT_SESSION_LIFETIME,
531 username: username.to_string(),
532 allocate_port: None,
533 password,
534 nonce,
535 },
536 );
537 }
538
539 Some(password)
540 }
541
542 pub fn allocated(&self) -> usize {
543 self.port_allocator.lock().len()
544 }
545
546 /// Assign a port number to the session.
547 ///
548 /// # Test
549 ///
550 /// ```
551 /// use turn_server::service::session::*;
552 /// use turn_server::service::*;
553 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
554 /// use turn_server::codec::crypto::Password;
555 /// use pollster::FutureExt;
556 ///
557 /// #[derive(Clone)]
558 /// struct ServiceHandlerTest;
559 ///
560 /// impl ServiceHandler for ServiceHandlerTest {
561 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
562 /// if username == "test" {
563 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
564 /// } else {
565 /// None
566 /// }
567 /// }
568 /// }
569 ///
570 /// let identifier = Identifier::new(
571 /// "127.0.0.1:8080".parse().unwrap(),
572 /// "127.0.0.1:3478".parse().unwrap(),
573 /// );
574 ///
575 /// let digest = Password::Md5([
576 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
577 /// 239,
578 /// ]);
579 ///
580 /// let sessions = SessionManager::new(SessionManagerOptions {
581 /// port_range: (49152..65535).into(),
582 /// handler: ServiceHandlerTest,
583 /// });
584 ///
585 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
586 ///
587 /// {
588 /// let lock = sessions.get_session(&identifier);
589 /// let session = lock.get_ref().unwrap();
590 /// match session {
591 /// Session::Authenticated { username, allocate_port, allocate_channels, .. } => {
592 /// assert_eq!(username, "test");
593 /// assert_eq!(allocate_port, &None);
594 /// assert_eq!(allocate_channels.len(), 0);
595 /// }
596 /// _ => panic!("Expected authenticated session"),
597 /// }
598 /// }
599 ///
600 /// let port = sessions.allocate(&identifier, None).unwrap();
601 /// {
602 /// let lock = sessions.get_session(&identifier);
603 /// let session = lock.get_ref().unwrap();
604 /// match session {
605 /// Session::Authenticated { username, allocate_port, allocate_channels, .. } => {
606 /// assert_eq!(username, "test");
607 /// assert_eq!(allocate_port, &Some(port));
608 /// assert_eq!(allocate_channels.len(), 0);
609 /// }
610 /// _ => panic!("Expected authenticated session"),
611 /// }
612 /// }
613 ///
614 /// assert_eq!(sessions.allocate(&identifier, None), Some(port));
615 /// ```
616 pub fn allocate(&self, identifier: &Identifier, lifetime: Option<u32>) -> Option<u16> {
617 let mut lock = self.sessions.write();
618
619 if let Some(Session::Authenticated {
620 allocate_port,
621 expires,
622 ..
623 }) = lock.get_mut(identifier)
624 {
625 // If the port has already been allocated, re-allocation is not allowed.
626 if let Some(port) = allocate_port {
627 return Some(*port);
628 }
629
630 // Records the port assigned to the current session and resets the alive time.
631 let port = self.port_allocator.lock().allocate(None)?;
632 *allocate_port = Some(port);
633 *expires =
634 self.timer.get() + (lifetime.unwrap_or(DEFAULT_SESSION_LIFETIME as u32) as u64);
635
636 // Write the allocation port binding table.
637 self.port_mapping_table.write().insert(port, *identifier);
638 Some(port)
639 } else {
640 None
641 }
642 }
643
644 /// Create permission for session.
645 ///
646 /// # Test
647 ///
648 /// ```
649 /// use turn_server::service::session::*;
650 /// use turn_server::service::*;
651 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
652 /// use turn_server::codec::crypto::Password;
653 /// use pollster::FutureExt;
654 ///
655 /// #[derive(Clone)]
656 /// struct ServiceHandlerTest;
657 ///
658 /// impl ServiceHandler for ServiceHandlerTest {
659 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
660 /// if username == "test" {
661 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
662 /// } else {
663 /// None
664 /// }
665 /// }
666 /// }
667 ///
668 /// let endpoint = "127.0.0.1:3478".parse().unwrap();
669 /// let identifier = Identifier::new(
670 /// "127.0.0.1:8080".parse().unwrap(),
671 /// "127.0.0.1:3478".parse().unwrap(),
672 /// );
673 ///
674 /// let peer_identifier = Identifier::new(
675 /// "127.0.0.1:8081".parse().unwrap(),
676 /// "127.0.0.1:3478".parse().unwrap(),
677 /// );
678 ///
679 /// let digest = Password::Md5([
680 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
681 /// 239,
682 /// ]);
683 ///
684 /// let sessions = SessionManager::new(SessionManagerOptions {
685 /// port_range: (49152..65535).into(),
686 /// handler: ServiceHandlerTest,
687 /// });
688 ///
689 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
690 /// sessions.get_password(&peer_identifier, "test", PasswordAlgorithm::Md5).block_on();
691 ///
692 /// let port = sessions.allocate(&identifier, None).unwrap();
693 /// let peer_port = sessions.allocate(&peer_identifier, None).unwrap();
694 ///
695 /// assert!(!sessions.create_permission(&identifier, &endpoint, &[port]));
696 /// assert!(sessions.create_permission(&identifier, &endpoint, &[peer_port]));
697 ///
698 /// assert!(!sessions.create_permission(&peer_identifier, &endpoint, &[peer_port]));
699 /// assert!(sessions.create_permission(&peer_identifier, &endpoint, &[port]));
700 /// ```
701 pub fn create_permission(
702 &self,
703 identifier: &Identifier,
704 endpoint: &SocketAddr,
705 ports: &[u16],
706 ) -> bool {
707 let mut sessions = self.sessions.write();
708 let mut port_relay_table = self.port_relay_table.write();
709 let port_mapping_table = self.port_mapping_table.read();
710
711 // Finds information about the current session.
712 if let Some(Session::Authenticated {
713 allocate_port,
714 permissions,
715 ..
716 }) = sessions.get_mut(identifier)
717 {
718 // The port number assigned to the current session.
719 let local_port = if let Some(it) = allocate_port {
720 *it
721 } else {
722 return false;
723 };
724
725 // You cannot create permissions for yourself.
726 if ports.contains(&local_port) {
727 return false;
728 }
729
730 // Each peer port must be present.
731 let mut peers = Vec::with_capacity(15);
732 for port in ports {
733 if let Some(it) = port_mapping_table.get(port) {
734 peers.push((it, *port));
735 } else {
736 return false;
737 }
738 }
739
740 // Create a port forwarding mapping relationship for each peer session.
741 for (peer, port) in peers {
742 port_relay_table
743 .entry(*peer)
744 .or_insert_with(|| HashMap::with_capacity(20))
745 .insert(
746 local_port,
747 Endpoint {
748 source: identifier.source,
749 endpoint: *endpoint,
750 },
751 );
752
753 // Do not store the same peer ports to the permission list over and over again.
754 if !permissions.contains(&port) {
755 permissions.push(port);
756 }
757 }
758
759 true
760 } else {
761 false
762 }
763 }
764
765 /// Binding a channel to the session.
766 ///
767 /// # Test
768 ///
769 /// ```
770 /// use turn_server::service::session::*;
771 /// use turn_server::service::*;
772 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
773 /// use turn_server::codec::crypto::Password;
774 /// use pollster::FutureExt;
775 ///
776 /// #[derive(Clone)]
777 /// struct ServiceHandlerTest;
778 ///
779 /// impl ServiceHandler for ServiceHandlerTest {
780 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
781 /// if username == "test" {
782 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
783 /// } else {
784 /// None
785 /// }
786 /// }
787 /// }
788 ///
789 /// let endpoint = "127.0.0.1:3478".parse().unwrap();
790 /// let identifier = Identifier::new(
791 /// "127.0.0.1:8080".parse().unwrap(),
792 /// "127.0.0.1:3478".parse().unwrap(),
793 /// );
794 ///
795 /// let peer_identifier = Identifier::new(
796 /// "127.0.0.1:8081".parse().unwrap(),
797 /// "127.0.0.1:3478".parse().unwrap(),
798 /// );
799 ///
800 /// let digest = Password::Md5([
801 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
802 /// 239,
803 /// ]);
804 ///
805 /// let sessions = SessionManager::new(SessionManagerOptions {
806 /// port_range: (49152..65535).into(),
807 /// handler: ServiceHandlerTest,
808 /// });
809 ///
810 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
811 /// sessions.get_password(&peer_identifier, "test", PasswordAlgorithm::Md5).block_on();
812 ///
813 /// let port = sessions.allocate(&identifier, None).unwrap();
814 /// let peer_port = sessions.allocate(&peer_identifier, None).unwrap();
815 /// {
816 /// assert_eq!(
817 /// match sessions.get_session(&identifier).get_ref().unwrap() {
818 /// Session::Authenticated { allocate_channels, .. } => allocate_channels.len(),
819 /// _ => panic!("Expected authenticated session"),
820 /// },
821 /// 0
822 /// );
823 /// }
824 ///
825 /// {
826 /// assert_eq!(
827 /// match sessions.get_session(&peer_identifier).get_ref().unwrap() {
828 /// Session::Authenticated { allocate_channels, .. } => allocate_channels.len(),
829 /// _ => panic!("Expected authenticated session"),
830 /// },
831 /// 0
832 /// );
833 /// }
834 ///
835 /// assert!(sessions.bind_channel(&identifier, &endpoint, peer_port, 0x4000));
836 /// assert!(sessions.bind_channel(&peer_identifier, &endpoint, port, 0x4000));
837 ///
838 /// {
839 /// assert_eq!(
840 /// match sessions.get_session(&identifier).get_ref().unwrap() {
841 /// Session::Authenticated { allocate_channels, .. } => allocate_channels.clone(),
842 /// _ => panic!("Expected authenticated session"),
843 /// },
844 /// vec![0x4000]
845 /// );
846 /// }
847 ///
848 /// {
849 /// assert_eq!(
850 /// match sessions.get_session(&peer_identifier).get_ref().unwrap() {
851 /// Session::Authenticated { allocate_channels, .. } => allocate_channels.clone(),
852 /// _ => panic!("Expected authenticated session"),
853 /// },
854 /// vec![0x4000]
855 /// );
856 /// }
857 /// ```
858 pub fn bind_channel(
859 &self,
860 identifier: &Identifier,
861 endpoint: &SocketAddr,
862 port: u16,
863 channel: u16,
864 ) -> bool {
865 // Finds the address of the bound opposing port.
866 let peer = if let Some(it) = self.port_mapping_table.read().get(&port) {
867 *it
868 } else {
869 return false;
870 };
871
872 // Records the channel used for the current session.
873 {
874 let mut lock = self.sessions.write();
875 if let Some(Session::Authenticated {
876 allocate_channels, ..
877 }) = lock.get_mut(identifier)
878 {
879 if !allocate_channels.contains(&channel) {
880 allocate_channels.push(channel);
881 }
882 } else {
883 return false;
884 };
885 }
886
887 // Binding ports also creates permissions.
888 if !self.create_permission(identifier, endpoint, &[port]) {
889 return false;
890 }
891
892 // Create channel forwarding mapping relationships for peers.
893 self.channel_relay_table
894 .write()
895 .entry(peer)
896 .or_insert_with(|| HashMap::with_capacity(10))
897 .insert(
898 channel,
899 Endpoint {
900 source: identifier.source,
901 endpoint: *endpoint,
902 },
903 );
904
905 true
906 }
907
908 /// Gets the peer of the current session bound channel.
909 ///
910 /// # Test
911 ///
912 /// ```
913 /// use turn_server::service::session::*;
914 /// use turn_server::service::*;
915 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
916 /// use turn_server::codec::crypto::Password;
917 /// use pollster::FutureExt;
918 ///
919 /// #[derive(Clone)]
920 /// struct ServiceHandlerTest;
921 ///
922 /// impl ServiceHandler for ServiceHandlerTest {
923 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
924 /// if username == "test" {
925 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
926 /// } else {
927 /// None
928 /// }
929 /// }
930 /// }
931 ///
932 /// let endpoint = "127.0.0.1:3478".parse().unwrap();
933 /// let identifier = Identifier::new(
934 /// "127.0.0.1:8080".parse().unwrap(),
935 /// "127.0.0.1:3478".parse().unwrap(),
936 /// );
937 ///
938 /// let peer_identifier = Identifier::new(
939 /// "127.0.0.1:8081".parse().unwrap(),
940 /// "127.0.0.1:3478".parse().unwrap(),
941 /// );
942 ///
943 /// let digest = Password::Md5([
944 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
945 /// 239,
946 /// ]);
947 ///
948 /// let sessions = SessionManager::new(SessionManagerOptions {
949 /// port_range: (49152..65535).into(),
950 /// handler: ServiceHandlerTest,
951 /// });
952 ///
953 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
954 /// sessions.get_password(&peer_identifier, "test", PasswordAlgorithm::Md5).block_on();
955 ///
956 /// let port = sessions.allocate(&identifier, None).unwrap();
957 /// let peer_port = sessions.allocate(&peer_identifier, None).unwrap();
958 ///
959 /// assert!(sessions.bind_channel(&identifier, &endpoint, peer_port, 0x4000));
960 /// assert!(sessions.bind_channel(&peer_identifier, &endpoint, port, 0x4000));
961 /// assert_eq!(
962 /// sessions
963 /// .get_channel_relay_address(&identifier, 0x4000)
964 /// .unwrap()
965 /// .endpoint(),
966 /// endpoint
967 /// );
968 ///
969 /// assert_eq!(
970 /// sessions
971 /// .get_channel_relay_address(&peer_identifier, 0x4000)
972 /// .unwrap()
973 /// .endpoint(),
974 /// endpoint
975 /// );
976 /// ```
977 pub fn get_channel_relay_address(
978 &self,
979 identifier: &Identifier,
980 channel: u16,
981 ) -> Option<Endpoint> {
982 self.channel_relay_table
983 .read()
984 .get(identifier)?
985 .get(&channel)
986 .copied()
987 }
988
989 /// Get the address of the port binding.
990 ///
991 /// # Test
992 ///
993 /// ```
994 /// use turn_server::service::session::*;
995 /// use turn_server::service::*;
996 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
997 /// use turn_server::codec::crypto::Password;
998 /// use pollster::FutureExt;
999 ///
1000 /// #[derive(Clone)]
1001 /// struct ServiceHandlerTest;
1002 ///
1003 /// impl ServiceHandler for ServiceHandlerTest {
1004 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
1005 /// if username == "test" {
1006 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
1007 /// } else {
1008 /// None
1009 /// }
1010 /// }
1011 /// }
1012 ///
1013 /// let endpoint = "127.0.0.1:3478".parse().unwrap();
1014 /// let identifier = Identifier::new(
1015 /// "127.0.0.1:8080".parse().unwrap(),
1016 /// "127.0.0.1:3478".parse().unwrap(),
1017 /// );
1018 ///
1019 /// let peer_identifier = Identifier::new(
1020 /// "127.0.0.1:8081".parse().unwrap(),
1021 /// "127.0.0.1:3478".parse().unwrap(),
1022 /// );
1023 ///
1024 /// let digest = Password::Md5([
1025 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
1026 /// 239,
1027 /// ]);
1028 ///
1029 /// let sessions = SessionManager::new(SessionManagerOptions {
1030 /// port_range: (49152..65535).into(),
1031 /// handler: ServiceHandlerTest,
1032 /// });
1033 ///
1034 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
1035 /// sessions.get_password(&peer_identifier, "test", PasswordAlgorithm::Md5).block_on();
1036 ///
1037 /// let port = sessions.allocate(&identifier, None).unwrap();
1038 /// let peer_port = sessions.allocate(&peer_identifier, None).unwrap();
1039 ///
1040 /// assert!(sessions.create_permission(&identifier, &endpoint, &[peer_port]));
1041 /// assert!(sessions.create_permission(&peer_identifier, &endpoint, &[port]));
1042 ///
1043 /// assert_eq!(
1044 /// sessions
1045 /// .get_relay_address(&identifier, peer_port)
1046 /// .unwrap()
1047 /// .endpoint(),
1048 /// endpoint
1049 /// );
1050 ///
1051 /// assert_eq!(
1052 /// sessions
1053 /// .get_relay_address(&peer_identifier, port)
1054 /// .unwrap()
1055 /// .endpoint(),
1056 /// endpoint
1057 /// );
1058 /// ```
1059 pub fn get_relay_address(&self, identifier: &Identifier, port: u16) -> Option<Endpoint> {
1060 self.port_relay_table
1061 .read()
1062 .get(identifier)?
1063 .get(&port)
1064 .copied()
1065 }
1066
1067 /// Refresh the session for identifier.
1068 ///
1069 /// # Test
1070 ///
1071 /// ```
1072 /// use turn_server::service::session::*;
1073 /// use turn_server::service::*;
1074 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
1075 /// use turn_server::codec::crypto::Password;
1076 /// use pollster::FutureExt;
1077 ///
1078 /// #[derive(Clone)]
1079 /// struct ServiceHandlerTest;
1080 ///
1081 /// impl ServiceHandler for ServiceHandlerTest {
1082 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
1083 /// if username == "test" {
1084 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
1085 /// } else {
1086 /// None
1087 /// }
1088 /// }
1089 /// }
1090 ///
1091 /// let identifier = Identifier::new(
1092 /// "127.0.0.1:8080".parse().unwrap(),
1093 /// "127.0.0.1:3478".parse().unwrap(),
1094 /// );
1095 ///
1096 /// let digest = Password::Md5([
1097 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
1098 /// 239,
1099 /// ]);
1100 ///
1101 /// let sessions = SessionManager::new(SessionManagerOptions {
1102 /// port_range: (49152..65535).into(),
1103 /// handler: ServiceHandlerTest,
1104 /// });
1105 ///
1106 /// // get_session always creates a new session if it doesn't exist
1107 /// {
1108 /// let lock = sessions.get_session_or_default(&identifier);
1109 /// let session = lock.get_ref().unwrap();
1110 /// match session {
1111 /// Session::New { .. } => {},
1112 /// _ => panic!("Expected new session"),
1113 /// }
1114 /// }
1115 ///
1116 /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
1117 ///
1118 /// {
1119 /// let lock = sessions.get_session(&identifier);
1120 /// let expires = match lock.get_ref().unwrap() {
1121 /// Session::Authenticated { expires, .. } => *expires,
1122 /// _ => panic!("Expected authenticated session"),
1123 /// };
1124 ///
1125 /// assert!(expires == 600 || expires == 601 || expires == 602);
1126 /// }
1127 ///
1128 /// assert!(sessions.refresh(&identifier, 0));
1129 ///
1130 /// // After refresh with lifetime 0, session should be removed
1131 /// {
1132 /// assert!(sessions.get_session(&identifier).get_ref().is_none());
1133 /// }
1134 /// ```
1135 pub fn refresh(&self, identifier: &Identifier, lifetime: u32) -> bool {
1136 if lifetime > 3600 {
1137 return false;
1138 }
1139
1140 if lifetime == 0 {
1141 self.remove_session(&[*identifier]);
1142 } else if let Some(Session::Authenticated { expires, .. }) =
1143 self.sessions.write().get_mut(identifier)
1144 {
1145 *expires = self.timer.get() + lifetime as u64;
1146 } else {
1147 return false;
1148 }
1149
1150 true
1151 }
1152}
1153
1154/// Generate a cryptographically random nonce for STUN/TURN authentication.
1155///
1156/// The nonce is a critical security component in STUN/TURN's long-term credential
1157/// mechanism (RFC 5389). It serves multiple purposes:
1158/// - Prevents replay attacks by ensuring each authentication is unique
1159/// - Acts as a server-issued challenge in the digest authentication flow
1160/// - Binds authentication attempts to specific sessions
1161///
1162/// This implementation generates a 16-character alphanumeric string using a
1163/// cryptographically secure random number generator. The length is chosen to
1164/// provide sufficient entropy (approximately 95 bits) to make brute-force
1165/// attacks computationally infeasible while remaining well under the RFC's
1166/// 128-character limit.
1167///
1168/// # Returns
1169/// A random 16-character string containing alphanumeric characters [a-zA-Z0-9].
1170///
1171/// # Security
1172/// Uses `rand::rng()` which provides cryptographic-quality randomness suitable
1173/// for security-sensitive operations. See RFC 7616 Section 5.4 for additional
1174/// guidance on nonce value selection.
1175fn generate_nonce() -> String {
1176 rand::rng()
1177 .sample_iter(&Alphanumeric)
1178 .take(16)
1179 .map(char::from)
1180 .collect()
1181}