Skip to main content

turn_server/service/session/
mod.rs

1pub mod ports;
2
3use super::{
4    ServiceHandler,
5    session::ports::{PortAllocator, PortRange},
6};
7
8use crate::codec::{crypto::Password, message::attributes::PasswordAlgorithm};
9
10use std::{
11    hash::Hash,
12    net::SocketAddr,
13    ops::{Deref, DerefMut},
14    sync::{
15        Arc,
16        atomic::{AtomicU64, Ordering},
17    },
18    thread::{self, sleep},
19    time::Duration,
20};
21
22use ahash::{HashMap, HashMapExt};
23use parking_lot::{Mutex, RwLock, RwLockReadGuard};
24use rand::{Rng, distr::Alphanumeric};
25
26/// The identifier of the session.
27///
28/// Each session needs to be identified by a combination of three pieces of
29/// information: the source address, and the transport protocol.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
31pub struct Identifier {
32    source: SocketAddr,
33    interface: SocketAddr,
34}
35
36impl Identifier {
37    #[inline]
38    pub fn new(source: SocketAddr, interface: SocketAddr) -> Self {
39        Self { source, interface }
40    }
41
42    #[inline]
43    pub fn source(&self) -> SocketAddr {
44        self.source
45    }
46
47    #[inline]
48    pub fn interface(&self) -> SocketAddr {
49        self.interface
50    }
51
52    #[inline]
53    pub fn source_mut(&mut self) -> &mut SocketAddr {
54        &mut self.source
55    }
56
57    #[inline]
58    pub fn interface_mut(&mut self) -> &mut SocketAddr {
59        &mut self.interface
60    }
61}
62
63/// The endpoint used to record the current session.
64///
65/// This is used when forwarding data.
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
67pub struct Endpoint {
68    source: SocketAddr,
69    endpoint: SocketAddr,
70}
71
72impl Endpoint {
73    #[inline]
74    pub fn new(source: SocketAddr, endpoint: SocketAddr) -> Self {
75        Self { source, endpoint }
76    }
77
78    #[inline]
79    pub fn source(&self) -> SocketAddr {
80        self.source
81    }
82
83    #[inline]
84    pub fn endpoint(&self) -> SocketAddr {
85        self.endpoint
86    }
87
88    #[inline]
89    pub fn source_mut(&mut self) -> &mut SocketAddr {
90        &mut self.source
91    }
92
93    #[inline]
94    pub fn endpoint_mut(&mut self) -> &mut SocketAddr {
95        &mut self.endpoint
96    }
97}
98
99/// The default HashMap is created without allocating capacity. To improve
100/// performance, the turn server needs to pre-allocate the available capacity.
101///
102/// So here the HashMap is rewrapped to allocate a large capacity (number of
103/// ports that can be allocated) at the default creation time as well.
104pub struct Table<K, V>(HashMap<K, V>);
105
106impl<K, V> Default for Table<K, V> {
107    fn default() -> Self {
108        Self(HashMap::with_capacity(PortRange::default().size()))
109    }
110}
111
112impl<K, V> AsRef<HashMap<K, V>> for Table<K, V> {
113    fn as_ref(&self) -> &HashMap<K, V> {
114        &self.0
115    }
116}
117
118impl<K, V> Deref for Table<K, V> {
119    type Target = HashMap<K, V>;
120
121    fn deref(&self) -> &Self::Target {
122        &self.0
123    }
124}
125
126impl<K, V> DerefMut for Table<K, V> {
127    fn deref_mut(&mut self) -> &mut Self::Target {
128        &mut self.0
129    }
130}
131
132/// Used to lengthen the timing of the release of a readable lock guard and to
133/// provide a more convenient way for external access to the lock's internal
134/// data.
135pub struct ReadLock<'a, 'b, K, R> {
136    pub key: &'a K,
137    pub lock: RwLockReadGuard<'b, R>,
138}
139
140impl<'a, 'b, K, V> ReadLock<'a, 'b, K, Table<K, V>>
141where
142    K: Eq + Hash,
143{
144    pub fn get_ref(&self) -> Option<&V> {
145        self.lock.get(self.key)
146    }
147}
148
149/// A specially optimised timer.
150///
151/// This timer does not stack automatically and needs to be stacked externally
152/// and manually.
153///
154/// ```
155/// use turn_server::service::session::Timer;
156///
157/// let timer = Timer::default();
158///
159/// assert_eq!(timer.get(), 0);
160/// assert_eq!(timer.add(), 1);
161/// assert_eq!(timer.get(), 1);
162/// ```
163#[derive(Default)]
164pub struct Timer(AtomicU64);
165
166impl Timer {
167    pub fn get(&self) -> u64 {
168        self.0.load(Ordering::Relaxed)
169    }
170
171    pub fn add(&self) -> u64 {
172        self.0.fetch_add(1, Ordering::Relaxed) + 1
173    }
174}
175
176/// Default session lifetime in seconds (10 minutes)
177pub const DEFAULT_SESSION_LIFETIME: u64 = 600;
178
179/// turn session information.
180///
181/// A user can have many sessions.
182///
183/// The default survival time for a session is 600 seconds.
184#[derive(Debug, Clone)]
185pub enum Session {
186    New {
187        nonce: String,
188        expires: u64,
189    },
190    Authenticated {
191        nonce: String,
192        /// Authentication information for the session.
193        ///
194        /// Digest data is data that summarises usernames and passwords by means of
195        /// long-term authentication.
196        username: String,
197        password: Password,
198        /// Assignment information for the session.
199        ///
200        /// SessionManager are all bound to only one port and one channel.
201        allocate_port: Option<u16>,
202        allocate_channels: Vec<u16>,
203        permissions: Vec<u16>,
204        expires: u64,
205    },
206}
207
208impl Session {
209    /// Get the nonce of the session.
210    pub fn nonce(&self) -> &str {
211        match self {
212            Session::New { nonce, .. } | Session::Authenticated { nonce, .. } => nonce,
213        }
214    }
215
216    /// Check if the session is a new session.
217    pub fn is_new(&self) -> bool {
218        matches!(self, Session::New { .. })
219    }
220
221    /// Check if the session is an authenticated session.
222    pub fn is_authenticated(&self) -> bool {
223        matches!(self, Session::Authenticated { .. })
224    }
225}
226
227pub struct SessionManagerOptions<T> {
228    pub port_range: PortRange,
229    pub handler: T,
230}
231
232pub struct SessionManager<T> {
233    sessions: RwLock<Table<Identifier, Session>>,
234    port_allocator: Mutex<PortAllocator>,
235    // Records the sessions corresponding to each assigned port, which will be needed when looking
236    // up sessions assigned to this port based on the port number.
237    port_mapping_table: RwLock<Table</* port */ u16, Identifier>>,
238    // Stores the address to which the session should be forwarded when it sends indication to a
239    // port. This is written when permissions are created to allow a certain address to be
240    // forwarded to the current session.
241    port_relay_table: RwLock<Table<Identifier, HashMap</* port */ u16, Endpoint>>>,
242    // Indicates to which session the data sent by a session to a channel should be forwarded.
243    channel_relay_table: RwLock<Table<Identifier, HashMap</* channel */ u16, Endpoint>>>,
244    timer: Timer,
245    handler: T,
246}
247
248impl<T> SessionManager<T>
249where
250    T: ServiceHandler,
251{
252    pub fn new(options: SessionManagerOptions<T>) -> Arc<Self> {
253        let this = Arc::new(Self {
254            port_allocator: Mutex::new(PortAllocator::new(options.port_range)),
255            channel_relay_table: RwLock::new(Table::default()),
256            port_mapping_table: RwLock::new(Table::default()),
257            port_relay_table: RwLock::new(Table::default()),
258            sessions: RwLock::new(Table::default()),
259            timer: Timer::default(),
260            handler: options.handler,
261        });
262
263        // This is a background thread that silently handles expiring sessions and
264        // cleans up session information when it expires.
265        let this_ = Arc::downgrade(&this);
266        thread::spawn(move || {
267            let mut identifiers = Vec::with_capacity(255);
268
269            while let Some(this) = this_.upgrade() {
270                // The timer advances one second and gets the current time offset.
271                let now = this.timer.add();
272
273                // This is the part that deletes the session information.
274                {
275                    // Finds sessions that have expired.
276                    {
277                        this.sessions
278                            .read()
279                            .iter()
280                            .filter(|(_, v)| match v {
281                                Session::New { expires, .. }
282                                | Session::Authenticated { expires, .. } => *expires <= now,
283                            })
284                            .for_each(|(k, _)| identifiers.push(*k));
285                    }
286
287                    // Delete the expired sessions.
288                    if !identifiers.is_empty() {
289                        this.remove_session(&identifiers);
290                        identifiers.clear();
291                    }
292                }
293
294                // Fixing a second tick.
295                sleep(Duration::from_secs(1));
296            }
297        });
298
299        this
300    }
301
302    fn remove_session(&self, identifiers: &[Identifier]) {
303        let mut sessions = self.sessions.write();
304        let mut port_allocator = self.port_allocator.lock();
305        let mut port_mapping_table = self.port_mapping_table.write();
306        let mut port_relay_table = self.port_relay_table.write();
307        let mut channel_relay_table = self.channel_relay_table.write();
308
309        identifiers.iter().for_each(|k| {
310            port_relay_table.remove(k);
311            channel_relay_table.remove(k);
312
313            if let Some(Session::Authenticated {
314                allocate_port,
315                username,
316                ..
317            }) = sessions.remove(k)
318            {
319                // Removes the session-bound port from the port binding table and
320                // releases the port back into the allocation pool.
321                if let Some(port) = allocate_port {
322                    port_mapping_table.remove(&port);
323                    port_allocator.deallocate(port);
324                }
325
326                // Notifies that the external session has been closed.
327                self.handler.on_destroy(k, &username);
328            }
329        });
330    }
331
332    /// Get session for identifier.
333    ///
334    /// # Test
335    ///
336    /// ```
337    /// use turn_server::service::session::*;
338    /// use turn_server::service::*;
339    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
340    /// use turn_server::codec::crypto::Password;
341    /// use pollster::FutureExt;
342    ///
343    /// #[derive(Clone)]
344    /// struct ServiceHandlerTest;
345    ///
346    /// impl ServiceHandler for ServiceHandlerTest {
347    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
348    ///         if username == "test" {
349    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
350    ///         } else {
351    ///             None
352    ///         }
353    ///     }
354    /// }
355    ///
356    /// let identifier = Identifier::new(
357    ///     "127.0.0.1:8080".parse().unwrap(),
358    ///     "127.0.0.1:3478".parse().unwrap(),
359    /// );
360    ///
361    /// let digest = Password::Md5([
362    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
363    ///     239,
364    /// ]);
365    ///
366    /// let sessions = SessionManager::new(SessionManagerOptions {
367    ///     port_range: (49152..65535).into(),
368    ///     handler: ServiceHandlerTest,
369    /// });
370    ///
371    /// // get_session always creates a new session if it doesn't exist
372    /// {
373    ///     assert!(sessions.get_session(&identifier).get_ref().is_none());
374    /// }
375    ///
376    /// // get_session always creates a new session if it doesn't exist
377    /// {
378    ///     let lock = sessions.get_session_or_default(&identifier);
379    ///     let session = lock.get_ref().unwrap();
380    ///     match session {
381    ///         Session::New { .. } => {},
382    ///         _ => panic!("Expected new session"),
383    ///     }
384    /// }
385    ///
386    /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
387    ///
388    /// {
389    ///     let lock = sessions.get_session(&identifier);
390    ///     let session = lock.get_ref().unwrap();
391    ///     match session {
392    ///         Session::Authenticated { username, allocate_port, allocate_channels, .. } => {
393    ///             assert_eq!(username, "test");
394    ///             assert_eq!(allocate_port, &None);
395    ///             assert_eq!(allocate_channels.len(), 0);
396    ///         }
397    ///         _ => panic!("Expected authenticated session"),
398    ///     }
399    /// }
400    /// ```
401    pub fn get_session_or_default<'a, 'b>(
402        &'a self,
403        key: &'b Identifier,
404    ) -> ReadLock<'b, 'a, Identifier, Table<Identifier, Session>> {
405        {
406            let lock = self.sessions.read();
407            if lock.contains_key(key) {
408                return ReadLock {
409                    lock: self.sessions.read(),
410                    key,
411                };
412            }
413        }
414
415        {
416            self.sessions.write().insert(
417                *key,
418                Session::New {
419                    // A random string of length 16.
420                    nonce: generate_nonce(),
421                    // Current time stacks for DEFAULT_SESSION_LIFETIME seconds.
422                    expires: self.timer.get() + DEFAULT_SESSION_LIFETIME,
423                },
424            );
425        }
426
427        ReadLock {
428            lock: self.sessions.read(),
429            key,
430        }
431    }
432
433    pub fn get_session<'a, 'b>(
434        &'a self,
435        key: &'b Identifier,
436    ) -> ReadLock<'b, 'a, Identifier, Table<Identifier, Session>> {
437        ReadLock {
438            lock: self.sessions.read(),
439            key,
440        }
441    }
442
443    /// Get digest for identifier.
444    ///
445    /// # Test
446    ///
447    /// ```
448    /// use turn_server::service::session::*;
449    /// use turn_server::service::*;
450    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
451    /// use turn_server::codec::crypto::Password;
452    /// use pollster::FutureExt;
453    ///
454    /// #[derive(Clone)]
455    /// struct ServiceHandlerTest;
456    ///
457    /// impl ServiceHandler for ServiceHandlerTest {
458    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
459    ///         if username == "test" {
460    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
461    ///         } else {
462    ///             None
463    ///         }
464    ///     }
465    /// }
466    ///
467    /// let identifier = Identifier::new(
468    ///     "127.0.0.1:8080".parse().unwrap(),
469    ///     "127.0.0.1:3478".parse().unwrap(),
470    /// );
471    ///
472    /// let digest = Password::Md5([
473    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
474    ///     239,
475    /// ]);
476    ///
477    /// let sessions = SessionManager::new(SessionManagerOptions {
478    ///     port_range: (49152..65535).into(),
479    ///     handler: ServiceHandlerTest,
480    /// });
481    ///
482    /// // First call get_session to create a new session
483    /// {
484    ///     sessions.get_session(&identifier);
485    /// }
486    /// assert_eq!(pollster::block_on(sessions.get_password(&identifier, "test1", PasswordAlgorithm::Md5)), None);
487    ///
488    /// // Create a new session for the next test
489    /// {
490    ///     sessions.get_session(&identifier);
491    /// }
492    /// assert_eq!(sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on(), Some(digest));
493    ///
494    /// // The third call should return cached digest
495    /// assert_eq!(sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on(), Some(digest));
496    /// ```
497    pub async fn get_password(
498        &self,
499        identifier: &Identifier,
500        username: &str,
501        algorithm: PasswordAlgorithm,
502    ) -> Option<Password> {
503        // Already authenticated, get the cached digest directly.
504        {
505            if let Some(Session::Authenticated { password, .. }) =
506                self.sessions.read().get(identifier)
507            {
508                return Some(*password);
509            }
510        }
511
512        // Get the current user's password from an external handler and create a
513        // digest.
514        let password = self.handler.get_password(username, algorithm).await?;
515
516        // Record a new session.
517        {
518            let mut lock = self.sessions.write();
519            let nonce = if let Some(Session::New { nonce, .. }) = lock.remove(identifier) {
520                nonce
521            } else {
522                generate_nonce()
523            };
524
525            lock.insert(
526                *identifier,
527                Session::Authenticated {
528                    allocate_channels: Vec::with_capacity(10),
529                    permissions: Vec::with_capacity(10),
530                    expires: self.timer.get() + DEFAULT_SESSION_LIFETIME,
531                    username: username.to_string(),
532                    allocate_port: None,
533                    password,
534                    nonce,
535                },
536            );
537        }
538
539        Some(password)
540    }
541
542    pub fn allocated(&self) -> usize {
543        self.port_allocator.lock().len()
544    }
545
546    /// Assign a port number to the session.
547    ///
548    /// # Test
549    ///
550    /// ```
551    /// use turn_server::service::session::*;
552    /// use turn_server::service::*;
553    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
554    /// use turn_server::codec::crypto::Password;
555    /// use pollster::FutureExt;
556    ///
557    /// #[derive(Clone)]
558    /// struct ServiceHandlerTest;
559    ///
560    /// impl ServiceHandler for ServiceHandlerTest {
561    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
562    ///         if username == "test" {
563    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
564    ///         } else {
565    ///             None
566    ///         }
567    ///     }
568    /// }
569    ///
570    /// let identifier = Identifier::new(
571    ///     "127.0.0.1:8080".parse().unwrap(),
572    ///     "127.0.0.1:3478".parse().unwrap(),
573    /// );
574    ///
575    /// let digest = Password::Md5([
576    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
577    ///     239,
578    /// ]);
579    ///
580    /// let sessions = SessionManager::new(SessionManagerOptions {
581    ///     port_range: (49152..65535).into(),
582    ///     handler: ServiceHandlerTest,
583    /// });
584    ///
585    /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
586    ///
587    /// {
588    ///     let lock = sessions.get_session(&identifier);
589    ///     let session = lock.get_ref().unwrap();
590    ///     match session {
591    ///         Session::Authenticated { username, allocate_port, allocate_channels, .. } => {
592    ///             assert_eq!(username, "test");
593    ///             assert_eq!(allocate_port, &None);
594    ///             assert_eq!(allocate_channels.len(), 0);
595    ///         }
596    ///         _ => panic!("Expected authenticated session"),
597    ///     }
598    /// }
599    ///
600    /// let port = sessions.allocate(&identifier, None).unwrap();
601    /// {
602    ///     let lock = sessions.get_session(&identifier);
603    ///     let session = lock.get_ref().unwrap();
604    ///     match session {
605    ///         Session::Authenticated { username, allocate_port, allocate_channels, .. } => {
606    ///             assert_eq!(username, "test");
607    ///             assert_eq!(allocate_port, &Some(port));
608    ///             assert_eq!(allocate_channels.len(), 0);
609    ///         }
610    ///         _ => panic!("Expected authenticated session"),
611    ///     }
612    /// }
613    ///
614    /// assert_eq!(sessions.allocate(&identifier, None), Some(port));
615    /// ```
616    pub fn allocate(&self, identifier: &Identifier, lifetime: Option<u32>) -> Option<u16> {
617        let mut lock = self.sessions.write();
618
619        if let Some(Session::Authenticated {
620            allocate_port,
621            expires,
622            ..
623        }) = lock.get_mut(identifier)
624        {
625            // If the port has already been allocated, re-allocation is not allowed.
626            if let Some(port) = allocate_port {
627                return Some(*port);
628            }
629
630            // Records the port assigned to the current session and resets the alive time.
631            let port = self.port_allocator.lock().allocate(None)?;
632            *allocate_port = Some(port);
633            *expires =
634                self.timer.get() + (lifetime.unwrap_or(DEFAULT_SESSION_LIFETIME as u32) as u64);
635
636            // Write the allocation port binding table.
637            self.port_mapping_table.write().insert(port, *identifier);
638            Some(port)
639        } else {
640            None
641        }
642    }
643
644    /// Create permission for session.
645    ///
646    /// # Test
647    ///
648    /// ```
649    /// use turn_server::service::session::*;
650    /// use turn_server::service::*;
651    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
652    /// use turn_server::codec::crypto::Password;
653    /// use pollster::FutureExt;
654    ///
655    /// #[derive(Clone)]
656    /// struct ServiceHandlerTest;
657    ///
658    /// impl ServiceHandler for ServiceHandlerTest {
659    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
660    ///         if username == "test" {
661    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
662    ///         } else {
663    ///             None
664    ///         }
665    ///     }
666    /// }
667    ///
668    /// let endpoint = "127.0.0.1:3478".parse().unwrap();
669    /// let identifier = Identifier::new(
670    ///     "127.0.0.1:8080".parse().unwrap(),
671    ///     "127.0.0.1:3478".parse().unwrap(),
672    /// );
673    ///
674    /// let peer_identifier = Identifier::new(
675    ///     "127.0.0.1:8081".parse().unwrap(),
676    ///     "127.0.0.1:3478".parse().unwrap(),
677    /// );
678    ///
679    /// let digest = Password::Md5([
680    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
681    ///     239,
682    /// ]);
683    ///
684    /// let sessions = SessionManager::new(SessionManagerOptions {
685    ///     port_range: (49152..65535).into(),
686    ///     handler: ServiceHandlerTest,
687    /// });
688    ///
689    /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
690    /// sessions.get_password(&peer_identifier, "test", PasswordAlgorithm::Md5).block_on();
691    ///
692    /// let port = sessions.allocate(&identifier, None).unwrap();
693    /// let peer_port = sessions.allocate(&peer_identifier, None).unwrap();
694    ///
695    /// assert!(!sessions.create_permission(&identifier, &endpoint, &[port]));
696    /// assert!(sessions.create_permission(&identifier, &endpoint, &[peer_port]));
697    ///
698    /// assert!(!sessions.create_permission(&peer_identifier, &endpoint, &[peer_port]));
699    /// assert!(sessions.create_permission(&peer_identifier, &endpoint, &[port]));
700    /// ```
701    pub fn create_permission(
702        &self,
703        identifier: &Identifier,
704        endpoint: &SocketAddr,
705        ports: &[u16],
706    ) -> bool {
707        let mut sessions = self.sessions.write();
708        let mut port_relay_table = self.port_relay_table.write();
709        let port_mapping_table = self.port_mapping_table.read();
710
711        // Finds information about the current session.
712        if let Some(Session::Authenticated {
713            allocate_port,
714            permissions,
715            ..
716        }) = sessions.get_mut(identifier)
717        {
718            // The port number assigned to the current session.
719            let local_port = if let Some(it) = allocate_port {
720                *it
721            } else {
722                return false;
723            };
724
725            // You cannot create permissions for yourself.
726            if ports.contains(&local_port) {
727                return false;
728            }
729
730            // Each peer port must be present.
731            let mut peers = Vec::with_capacity(15);
732            for port in ports {
733                if let Some(it) = port_mapping_table.get(port) {
734                    peers.push((it, *port));
735                } else {
736                    return false;
737                }
738            }
739
740            // Create a port forwarding mapping relationship for each peer session.
741            for (peer, port) in peers {
742                port_relay_table
743                    .entry(*peer)
744                    .or_insert_with(|| HashMap::with_capacity(20))
745                    .insert(
746                        local_port,
747                        Endpoint {
748                            source: identifier.source,
749                            endpoint: *endpoint,
750                        },
751                    );
752
753                // Do not store the same peer ports to the permission list over and over again.
754                if !permissions.contains(&port) {
755                    permissions.push(port);
756                }
757            }
758
759            true
760        } else {
761            false
762        }
763    }
764
765    /// Binding a channel to the session.
766    ///
767    /// # Test
768    ///
769    /// ```
770    /// use turn_server::service::session::*;
771    /// use turn_server::service::*;
772    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
773    /// use turn_server::codec::crypto::Password;
774    /// use pollster::FutureExt;
775    ///
776    /// #[derive(Clone)]
777    /// struct ServiceHandlerTest;
778    ///
779    /// impl ServiceHandler for ServiceHandlerTest {
780    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
781    ///         if username == "test" {
782    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
783    ///         } else {
784    ///             None
785    ///         }
786    ///     }
787    /// }
788    ///
789    /// let endpoint = "127.0.0.1:3478".parse().unwrap();
790    /// let identifier = Identifier::new(
791    ///     "127.0.0.1:8080".parse().unwrap(),
792    ///     "127.0.0.1:3478".parse().unwrap(),
793    /// );
794    ///
795    /// let peer_identifier = Identifier::new(
796    ///     "127.0.0.1:8081".parse().unwrap(),
797    ///     "127.0.0.1:3478".parse().unwrap(),
798    /// );
799    ///
800    /// let digest = Password::Md5([
801    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
802    ///     239,
803    /// ]);
804    ///
805    /// let sessions = SessionManager::new(SessionManagerOptions {
806    ///     port_range: (49152..65535).into(),
807    ///     handler: ServiceHandlerTest,
808    /// });
809    ///
810    /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
811    /// sessions.get_password(&peer_identifier, "test", PasswordAlgorithm::Md5).block_on();
812    ///
813    /// let port = sessions.allocate(&identifier, None).unwrap();
814    /// let peer_port = sessions.allocate(&peer_identifier, None).unwrap();
815    /// {
816    ///     assert_eq!(
817    ///         match sessions.get_session(&identifier).get_ref().unwrap() {
818    ///             Session::Authenticated { allocate_channels, .. } => allocate_channels.len(),
819    ///             _ => panic!("Expected authenticated session"),
820    ///         },
821    ///         0
822    ///     );
823    /// }
824    ///
825    /// {
826    ///     assert_eq!(
827    ///         match sessions.get_session(&peer_identifier).get_ref().unwrap() {
828    ///             Session::Authenticated { allocate_channels, .. } => allocate_channels.len(),
829    ///             _ => panic!("Expected authenticated session"),
830    ///         },
831    ///         0
832    ///     );
833    /// }
834    ///
835    /// assert!(sessions.bind_channel(&identifier, &endpoint, peer_port, 0x4000));
836    /// assert!(sessions.bind_channel(&peer_identifier, &endpoint, port, 0x4000));
837    ///
838    /// {
839    ///     assert_eq!(
840    ///         match sessions.get_session(&identifier).get_ref().unwrap() {
841    ///             Session::Authenticated { allocate_channels, .. } => allocate_channels.clone(),
842    ///             _ => panic!("Expected authenticated session"),
843    ///         },
844    ///         vec![0x4000]
845    ///     );
846    /// }
847    ///
848    /// {
849    ///     assert_eq!(
850    ///         match sessions.get_session(&peer_identifier).get_ref().unwrap() {
851    ///             Session::Authenticated { allocate_channels, .. } => allocate_channels.clone(),
852    ///             _ => panic!("Expected authenticated session"),
853    ///         },
854    ///         vec![0x4000]
855    ///     );
856    /// }
857    /// ```
858    pub fn bind_channel(
859        &self,
860        identifier: &Identifier,
861        endpoint: &SocketAddr,
862        port: u16,
863        channel: u16,
864    ) -> bool {
865        // Finds the address of the bound opposing port.
866        let peer = if let Some(it) = self.port_mapping_table.read().get(&port) {
867            *it
868        } else {
869            return false;
870        };
871
872        // Records the channel used for the current session.
873        {
874            let mut lock = self.sessions.write();
875            if let Some(Session::Authenticated {
876                allocate_channels, ..
877            }) = lock.get_mut(identifier)
878            {
879                if !allocate_channels.contains(&channel) {
880                    allocate_channels.push(channel);
881                }
882            } else {
883                return false;
884            };
885        }
886
887        // Binding ports also creates permissions.
888        if !self.create_permission(identifier, endpoint, &[port]) {
889            return false;
890        }
891
892        // Create channel forwarding mapping relationships for peers.
893        self.channel_relay_table
894            .write()
895            .entry(peer)
896            .or_insert_with(|| HashMap::with_capacity(10))
897            .insert(
898                channel,
899                Endpoint {
900                    source: identifier.source,
901                    endpoint: *endpoint,
902                },
903            );
904
905        true
906    }
907
908    /// Gets the peer of the current session bound channel.
909    ///
910    /// # Test
911    ///
912    /// ```
913    /// use turn_server::service::session::*;
914    /// use turn_server::service::*;
915    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
916    /// use turn_server::codec::crypto::Password;
917    /// use pollster::FutureExt;
918    ///
919    /// #[derive(Clone)]
920    /// struct ServiceHandlerTest;
921    ///
922    /// impl ServiceHandler for ServiceHandlerTest {
923    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
924    ///         if username == "test" {
925    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
926    ///         } else {
927    ///             None
928    ///         }
929    ///     }
930    /// }
931    ///
932    /// let endpoint = "127.0.0.1:3478".parse().unwrap();
933    /// let identifier = Identifier::new(
934    ///     "127.0.0.1:8080".parse().unwrap(),
935    ///     "127.0.0.1:3478".parse().unwrap(),
936    /// );
937    ///
938    /// let peer_identifier = Identifier::new(
939    ///     "127.0.0.1:8081".parse().unwrap(),
940    ///     "127.0.0.1:3478".parse().unwrap(),
941    /// );
942    ///
943    /// let digest = Password::Md5([
944    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
945    ///     239,
946    /// ]);
947    ///
948    /// let sessions = SessionManager::new(SessionManagerOptions {
949    ///     port_range: (49152..65535).into(),
950    ///     handler: ServiceHandlerTest,
951    /// });
952    ///
953    /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
954    /// sessions.get_password(&peer_identifier, "test", PasswordAlgorithm::Md5).block_on();
955    ///
956    /// let port = sessions.allocate(&identifier, None).unwrap();
957    /// let peer_port = sessions.allocate(&peer_identifier, None).unwrap();
958    ///
959    /// assert!(sessions.bind_channel(&identifier, &endpoint, peer_port, 0x4000));
960    /// assert!(sessions.bind_channel(&peer_identifier, &endpoint, port, 0x4000));
961    /// assert_eq!(
962    ///     sessions
963    ///         .get_channel_relay_address(&identifier, 0x4000)
964    ///         .unwrap()
965    ///         .endpoint(),
966    ///     endpoint
967    /// );
968    ///
969    /// assert_eq!(
970    ///     sessions
971    ///         .get_channel_relay_address(&peer_identifier, 0x4000)
972    ///         .unwrap()
973    ///         .endpoint(),
974    ///     endpoint
975    /// );
976    /// ```
977    pub fn get_channel_relay_address(
978        &self,
979        identifier: &Identifier,
980        channel: u16,
981    ) -> Option<Endpoint> {
982        self.channel_relay_table
983            .read()
984            .get(identifier)?
985            .get(&channel)
986            .copied()
987    }
988
989    /// Get the address of the port binding.
990    ///
991    /// # Test
992    ///
993    /// ```
994    /// use turn_server::service::session::*;
995    /// use turn_server::service::*;
996    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
997    /// use turn_server::codec::crypto::Password;
998    /// use pollster::FutureExt;
999    ///
1000    /// #[derive(Clone)]
1001    /// struct ServiceHandlerTest;
1002    ///
1003    /// impl ServiceHandler for ServiceHandlerTest {
1004    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
1005    ///         if username == "test" {
1006    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
1007    ///         } else {
1008    ///             None
1009    ///         }
1010    ///     }
1011    /// }
1012    ///
1013    /// let endpoint = "127.0.0.1:3478".parse().unwrap();
1014    /// let identifier = Identifier::new(
1015    ///     "127.0.0.1:8080".parse().unwrap(),
1016    ///     "127.0.0.1:3478".parse().unwrap(),
1017    /// );
1018    ///
1019    /// let peer_identifier = Identifier::new(
1020    ///     "127.0.0.1:8081".parse().unwrap(),
1021    ///     "127.0.0.1:3478".parse().unwrap(),
1022    /// );
1023    ///
1024    /// let digest = Password::Md5([
1025    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
1026    ///     239,
1027    /// ]);
1028    ///
1029    /// let sessions = SessionManager::new(SessionManagerOptions {
1030    ///     port_range: (49152..65535).into(),
1031    ///     handler: ServiceHandlerTest,
1032    /// });
1033    ///
1034    /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
1035    /// sessions.get_password(&peer_identifier, "test", PasswordAlgorithm::Md5).block_on();
1036    ///
1037    /// let port = sessions.allocate(&identifier, None).unwrap();
1038    /// let peer_port = sessions.allocate(&peer_identifier, None).unwrap();
1039    ///
1040    /// assert!(sessions.create_permission(&identifier, &endpoint, &[peer_port]));
1041    /// assert!(sessions.create_permission(&peer_identifier, &endpoint, &[port]));
1042    ///
1043    /// assert_eq!(
1044    ///     sessions
1045    ///         .get_relay_address(&identifier, peer_port)
1046    ///         .unwrap()
1047    ///         .endpoint(),
1048    ///     endpoint
1049    /// );
1050    ///
1051    /// assert_eq!(
1052    ///     sessions
1053    ///         .get_relay_address(&peer_identifier, port)
1054    ///         .unwrap()
1055    ///         .endpoint(),
1056    ///     endpoint
1057    /// );
1058    /// ```
1059    pub fn get_relay_address(&self, identifier: &Identifier, port: u16) -> Option<Endpoint> {
1060        self.port_relay_table
1061            .read()
1062            .get(identifier)?
1063            .get(&port)
1064            .copied()
1065    }
1066
1067    /// Refresh the session for identifier.
1068    ///
1069    /// # Test
1070    ///
1071    /// ```
1072    /// use turn_server::service::session::*;
1073    /// use turn_server::service::*;
1074    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
1075    /// use turn_server::codec::crypto::Password;
1076    /// use pollster::FutureExt;
1077    ///
1078    /// #[derive(Clone)]
1079    /// struct ServiceHandlerTest;
1080    ///
1081    /// impl ServiceHandler for ServiceHandlerTest {
1082    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
1083    ///         if username == "test" {
1084    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
1085    ///         } else {
1086    ///             None
1087    ///         }
1088    ///     }
1089    /// }
1090    ///
1091    /// let identifier = Identifier::new(
1092    ///     "127.0.0.1:8080".parse().unwrap(),
1093    ///     "127.0.0.1:3478".parse().unwrap(),
1094    /// );
1095    ///
1096    /// let digest = Password::Md5([
1097    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
1098    ///     239,
1099    /// ]);
1100    ///
1101    /// let sessions = SessionManager::new(SessionManagerOptions {
1102    ///     port_range: (49152..65535).into(),
1103    ///     handler: ServiceHandlerTest,
1104    /// });
1105    ///
1106    /// // get_session always creates a new session if it doesn't exist
1107    /// {
1108    ///     let lock = sessions.get_session_or_default(&identifier);
1109    ///     let session = lock.get_ref().unwrap();
1110    ///     match session {
1111    ///         Session::New { .. } => {},
1112    ///         _ => panic!("Expected new session"),
1113    ///     }
1114    /// }
1115    ///
1116    /// sessions.get_password(&identifier, "test", PasswordAlgorithm::Md5).block_on();
1117    ///
1118    /// {
1119    ///     let lock = sessions.get_session(&identifier);
1120    ///     let expires = match lock.get_ref().unwrap() {
1121    ///         Session::Authenticated { expires, .. } => *expires,
1122    ///         _ => panic!("Expected authenticated session"),
1123    ///     };
1124    ///
1125    ///     assert!(expires == 600 || expires == 601 || expires == 602);
1126    /// }
1127    ///
1128    /// assert!(sessions.refresh(&identifier, 0));
1129    ///
1130    /// // After refresh with lifetime 0, session should be removed
1131    /// {
1132    ///     assert!(sessions.get_session(&identifier).get_ref().is_none());
1133    /// }
1134    /// ```
1135    pub fn refresh(&self, identifier: &Identifier, lifetime: u32) -> bool {
1136        if lifetime > 3600 {
1137            return false;
1138        }
1139
1140        if lifetime == 0 {
1141            self.remove_session(&[*identifier]);
1142        } else if let Some(Session::Authenticated { expires, .. }) =
1143            self.sessions.write().get_mut(identifier)
1144        {
1145            *expires = self.timer.get() + lifetime as u64;
1146        } else {
1147            return false;
1148        }
1149
1150        true
1151    }
1152}
1153
1154/// Generate a cryptographically random nonce for STUN/TURN authentication.
1155///
1156/// The nonce is a critical security component in STUN/TURN's long-term credential
1157/// mechanism (RFC 5389). It serves multiple purposes:
1158/// - Prevents replay attacks by ensuring each authentication is unique
1159/// - Acts as a server-issued challenge in the digest authentication flow
1160/// - Binds authentication attempts to specific sessions
1161///
1162/// This implementation generates a 16-character alphanumeric string using a
1163/// cryptographically secure random number generator. The length is chosen to
1164/// provide sufficient entropy (approximately 95 bits) to make brute-force
1165/// attacks computationally infeasible while remaining well under the RFC's
1166/// 128-character limit.
1167///
1168/// # Returns
1169/// A random 16-character string containing alphanumeric characters [a-zA-Z0-9].
1170///
1171/// # Security
1172/// Uses `rand::rng()` which provides cryptographic-quality randomness suitable
1173/// for security-sensitive operations. See RFC 7616 Section 5.4 for additional
1174/// guidance on nonce value selection.
1175fn generate_nonce() -> String {
1176    rand::rng()
1177        .sample_iter(&Alphanumeric)
1178        .take(16)
1179        .map(char::from)
1180        .collect()
1181}