turn_server/service/session/
mod.rs

1pub mod ports;
2
3use super::{
4    ServiceHandler,
5    session::ports::{PortAllocator, PortRange},
6};
7
8use crate::codec::{crypto::Password, message::attributes::PasswordAlgorithm};
9
10use std::{
11    hash::Hash,
12    net::SocketAddr,
13    ops::{Deref, DerefMut},
14    sync::{
15        Arc,
16        atomic::{AtomicU64, Ordering},
17    },
18    thread::{self, sleep},
19    time::Duration,
20};
21
22use ahash::{HashMap, HashMapExt};
23use parking_lot::{Mutex, RwLock, RwLockReadGuard};
24use rand::{Rng, distr::Alphanumeric};
25
26/// The identifier of the session or addr.
27///
28/// Each session needs to be identified by a combination of three pieces of
29/// information: the addr address, and the transport protocol.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
31pub struct Identifier {
32    pub source: SocketAddr,
33    pub interface: SocketAddr,
34}
35
36/// The addr used to record the current session.
37///
38/// This is used when forwarding data.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
40pub struct Endpoint {
41    pub source: SocketAddr,
42    pub endpoint: SocketAddr,
43}
44
45/// The default HashMap is created without allocating capacity. To improve
46/// performance, the turn server needs to pre-allocate the available capacity.
47///
48/// So here the HashMap is rewrapped to allocate a large capacity (number of
49/// ports that can be allocated) at the default creation time as well.
50pub struct Table<K, V>(HashMap<K, V>);
51
52impl<K, V> Default for Table<K, V> {
53    fn default() -> Self {
54        Self(HashMap::with_capacity(PortRange::default().size()))
55    }
56}
57
58impl<K, V> AsRef<HashMap<K, V>> for Table<K, V> {
59    fn as_ref(&self) -> &HashMap<K, V> {
60        &self.0
61    }
62}
63
64impl<K, V> Deref for Table<K, V> {
65    type Target = HashMap<K, V>;
66
67    fn deref(&self) -> &Self::Target {
68        &self.0
69    }
70}
71
72impl<K, V> DerefMut for Table<K, V> {
73    fn deref_mut(&mut self) -> &mut Self::Target {
74        &mut self.0
75    }
76}
77
78/// Used to lengthen the timing of the release of a readable lock guard and to
79/// provide a more convenient way for external access to the lock's internal
80/// data.
81pub struct ReadLock<'a, 'b, K, R> {
82    pub key: &'a K,
83    pub lock: RwLockReadGuard<'b, R>,
84}
85
86impl<'a, 'b, K, V> ReadLock<'a, 'b, K, Table<K, V>>
87where
88    K: Eq + Hash,
89{
90    pub fn get_ref(&self) -> Option<&V> {
91        self.lock.get(self.key)
92    }
93}
94
95/// A specially optimised timer.
96///
97/// This timer does not stack automatically and needs to be stacked externally
98/// and manually.
99///
100/// ```
101/// use turn_server::service::session::Timer;
102///
103/// let timer = Timer::default();
104///
105/// assert_eq!(timer.get(), 0);
106/// assert_eq!(timer.add(), 1);
107/// assert_eq!(timer.get(), 1);
108/// ```
109#[derive(Default)]
110pub struct Timer(AtomicU64);
111
112impl Timer {
113    pub fn get(&self) -> u64 {
114        self.0.load(Ordering::Relaxed)
115    }
116
117    pub fn add(&self) -> u64 {
118        self.0.fetch_add(1, Ordering::Relaxed) + 1
119    }
120}
121
122/// turn session information.
123///
124/// A user can have many sessions.
125///
126/// The default survival time for a session is 600 seconds.
127#[derive(Debug, Clone)]
128pub enum Session {
129    New {
130        nonce: String,
131        expires: u64,
132    },
133    Authenticated {
134        nonce: String,
135        /// Authentication information for the session.
136        ///
137        /// Digest data is data that summarises usernames and passwords by means of
138        /// long-term authentication.
139        username: String,
140        password: Password,
141        /// Assignment information for the session.
142        ///
143        /// SessionManager are all bound to only one port and one channel.
144        allocate_port: Option<u16>,
145        allocate_channels: Vec<u16>,
146        permissions: Vec<u16>,
147        expires: u64,
148    },
149}
150
151impl Session {
152    /// Get the nonce of the session.
153    pub fn nonce(&self) -> &str {
154        match self {
155            Session::New { nonce, .. } | Session::Authenticated { nonce, .. } => nonce,
156        }
157    }
158
159    /// Check if the session is a new session.
160    pub fn is_new(&self) -> bool {
161        matches!(self, Session::New { .. })
162    }
163
164    /// Check if the session is an authenticated session.
165    pub fn is_authenticated(&self) -> bool {
166        matches!(self, Session::Authenticated { .. })
167    }
168}
169
170pub struct SessionManagerOptions<T> {
171    pub port_range: PortRange,
172    pub handler: T,
173}
174
175pub struct SessionManager<T> {
176    sessions: RwLock<Table<Identifier, Session>>,
177    port_allocator: Mutex<PortAllocator>,
178    // Records the sessions corresponding to each assigned port, which will be needed when looking
179    // up sessions assigned to this port based on the port number.
180    port_mapping_table: RwLock<Table</* port */ u16, Identifier>>,
181    // Stores the address to which the session should be forwarded when it sends indication to a
182    // port. This is written when permissions are created to allow a certain address to be
183    // forwarded to the current session.
184    port_relay_table: RwLock<Table<Identifier, HashMap</* port */ u16, Endpoint>>>,
185    // Indicates to which session the data sent by a session to a channel should be forwarded.
186    channel_relay_table: RwLock<Table<Identifier, HashMap</* channel */ u16, Endpoint>>>,
187    timer: Timer,
188    handler: T,
189}
190
191impl<T> SessionManager<T>
192where
193    T: ServiceHandler,
194{
195    pub fn new(options: SessionManagerOptions<T>) -> Arc<Self> {
196        let this = Arc::new(Self {
197            port_allocator: Mutex::new(PortAllocator::new(options.port_range)),
198            channel_relay_table: RwLock::new(Table::default()),
199            port_mapping_table: RwLock::new(Table::default()),
200            port_relay_table: RwLock::new(Table::default()),
201            sessions: RwLock::new(Table::default()),
202            timer: Timer::default(),
203            handler: options.handler,
204        });
205
206        // This is a background thread that silently handles expiring sessions and
207        // cleans up session information when it expires.
208        let this_ = Arc::downgrade(&this);
209        thread::spawn(move || {
210            let mut address = Vec::with_capacity(255);
211
212            while let Some(this) = this_.upgrade() {
213                // The timer advances one second and gets the current time offset.
214                let now = this.timer.add();
215
216                // This is the part that deletes the session information.
217                {
218                    // Finds sessions that have expired.
219                    {
220                        this.sessions
221                            .read()
222                            .iter()
223                            .filter(|(_, v)| match v {
224                                Session::New { expires, .. }
225                                | Session::Authenticated { expires, .. } => *expires <= now,
226                            })
227                            .for_each(|(k, _)| address.push(*k));
228                    }
229
230                    // Delete the expired sessions.
231                    if !address.is_empty() {
232                        this.remove_session(&address);
233                        address.clear();
234                    }
235                }
236
237                // Fixing a second tick.
238                sleep(Duration::from_secs(1));
239            }
240        });
241
242        this
243    }
244
245    fn remove_session(&self, addrs: &[Identifier]) {
246        let mut sessions = self.sessions.write();
247        let mut port_allocator = self.port_allocator.lock();
248        let mut port_mapping_table = self.port_mapping_table.write();
249        let mut port_relay_table = self.port_relay_table.write();
250        let mut channel_relay_table = self.channel_relay_table.write();
251
252        addrs.iter().for_each(|k| {
253            port_relay_table.remove(k);
254            channel_relay_table.remove(k);
255
256            if let Some(Session::Authenticated {
257                allocate_port,
258                username,
259                ..
260            }) = sessions.remove(k)
261            {
262                // Removes the session-bound port from the port binding table and
263                // releases the port back into the allocation pool.
264                if let Some(port) = allocate_port {
265                    port_mapping_table.remove(&port);
266                    port_allocator.restore(port);
267                }
268
269                // Notifies that the external session has been closed.
270                self.handler.on_destroy(k, &username);
271            }
272        });
273    }
274
275    /// Get session for addr.
276    ///
277    /// # Test
278    ///
279    /// ```
280    /// use turn_server::service::session::*;
281    /// use turn_server::service::*;
282    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
283    /// use turn_server::codec::crypto::Password;
284    /// use pollster::FutureExt;
285    ///
286    /// #[derive(Clone)]
287    /// struct ServiceHandlerTest;
288    ///
289    /// impl ServiceHandler for ServiceHandlerTest {
290    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
291    ///         if username == "test" {
292    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
293    ///         } else {
294    ///             None
295    ///         }
296    ///     }
297    /// }
298    ///
299    /// let addr = Identifier {
300    ///     source: "127.0.0.1:8080".parse().unwrap(),
301    ///     interface: "127.0.0.1:3478".parse().unwrap(),
302    /// };
303    ///
304    /// let digest = Password::Md5([
305    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
306    ///     239,
307    /// ]);
308    ///
309    /// let sessions = SessionManager::new(SessionManagerOptions {
310    ///     port_range: (49152..65535).into(),
311    ///     handler: ServiceHandlerTest,
312    /// });
313    ///
314    /// // get_session always creates a new session if it doesn't exist
315    /// {
316    ///     assert!(sessions.get_session(&addr).get_ref().is_none());
317    /// }
318    ///
319    /// // get_session always creates a new session if it doesn't exist
320    /// {
321    ///     let lock = sessions.get_session_or_default(&addr);
322    ///     let session = lock.get_ref().unwrap();
323    ///     match session {
324    ///         Session::New { .. } => {},
325    ///         _ => panic!("Expected new session"),
326    ///     }
327    /// }
328    ///
329    /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
330    ///
331    /// {
332    ///     let lock = sessions.get_session(&addr);
333    ///     let session = lock.get_ref().unwrap();
334    ///     match session {
335    ///         Session::Authenticated { username, allocate_port, allocate_channels, .. } => {
336    ///             assert_eq!(username, "test");
337    ///             assert_eq!(allocate_port, &None);
338    ///             assert_eq!(allocate_channels.len(), 0);
339    ///         }
340    ///         _ => panic!("Expected authenticated session"),
341    ///     }
342    /// }
343    /// ```
344    pub fn get_session_or_default<'a, 'b>(
345        &'a self,
346        key: &'b Identifier,
347    ) -> ReadLock<'b, 'a, Identifier, Table<Identifier, Session>> {
348        {
349            let lock = self.sessions.read();
350            if lock.contains_key(key) {
351                return ReadLock {
352                    lock: self.sessions.read(),
353                    key,
354                };
355            }
356        }
357
358        {
359            self.sessions.write().insert(
360                *key,
361                Session::New {
362                    // A random string of length 16.
363                    nonce: make_nonce(),
364                    // Current time stacks for 600 seconds.
365                    expires: self.timer.get() + 600,
366                },
367            );
368        }
369
370        ReadLock {
371            lock: self.sessions.read(),
372            key,
373        }
374    }
375
376    pub fn get_session<'a, 'b>(
377        &'a self,
378        key: &'b Identifier,
379    ) -> ReadLock<'b, 'a, Identifier, Table<Identifier, Session>> {
380        ReadLock {
381            lock: self.sessions.read(),
382            key,
383        }
384    }
385
386    /// Get digest for addr.
387    ///
388    /// # Test
389    ///
390    /// ```
391    /// use turn_server::service::session::*;
392    /// use turn_server::service::*;
393    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
394    /// use turn_server::codec::crypto::Password;
395    /// use pollster::FutureExt;
396    ///
397    /// #[derive(Clone)]
398    /// struct ServiceHandlerTest;
399    ///
400    /// impl ServiceHandler for ServiceHandlerTest {
401    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
402    ///         if username == "test" {
403    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
404    ///         } else {
405    ///             None
406    ///         }
407    ///     }
408    /// }
409    ///
410    /// let addr = Identifier {
411    ///     source: "127.0.0.1:8080".parse().unwrap(),
412    ///     interface: "127.0.0.1:3478".parse().unwrap(),
413    /// };
414    ///
415    /// let digest = Password::Md5([
416    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
417    ///     239,
418    /// ]);
419    ///
420    /// let sessions = SessionManager::new(SessionManagerOptions {
421    ///     port_range: (49152..65535).into(),
422    ///     handler: ServiceHandlerTest,
423    /// });
424    ///
425    /// // First call get_session to create a new session
426    /// {
427    ///     sessions.get_session(&addr);
428    /// }
429    /// assert_eq!(pollster::block_on(sessions.get_password(&addr, "test1", PasswordAlgorithm::Md5)), None);
430    ///
431    /// // Create a new session for the next test
432    /// {
433    ///     sessions.get_session(&addr);
434    /// }
435    /// assert_eq!(sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on(), Some(digest));
436    ///
437    /// // The third call should return cached digest
438    /// assert_eq!(sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on(), Some(digest));
439    /// ```
440    pub async fn get_password(
441        &self,
442        addr: &Identifier,
443        username: &str,
444        algorithm: PasswordAlgorithm,
445    ) -> Option<Password> {
446        // Already authenticated, get the cached digest directly.
447        {
448            if let Some(Session::Authenticated { password, .. }) = self.sessions.read().get(addr) {
449                return Some(*password);
450            }
451        }
452
453        // Get the current user's password from an external handler and create a
454        // digest.
455        let password = self.handler.get_password(username, algorithm).await?;
456
457        // Record a new session.
458        {
459            let mut lock = self.sessions.write();
460            let nonce = if let Some(Session::New { nonce, .. }) = lock.remove(addr) {
461                nonce
462            } else {
463                make_nonce()
464            };
465
466            lock.insert(
467                *addr,
468                Session::Authenticated {
469                    allocate_channels: Vec::with_capacity(10),
470                    permissions: Vec::with_capacity(10),
471                    expires: self.timer.get() + 600,
472                    username: username.to_string(),
473                    allocate_port: None,
474                    password,
475                    nonce,
476                },
477            );
478        }
479
480        Some(password)
481    }
482
483    pub fn allocated(&self) -> usize {
484        self.port_allocator.lock().len()
485    }
486
487    /// Assign a port number to the session.
488    ///
489    /// # Test
490    ///
491    /// ```
492    /// use turn_server::service::session::*;
493    /// use turn_server::service::*;
494    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
495    /// use turn_server::codec::crypto::Password;
496    /// use pollster::FutureExt;
497    ///
498    /// #[derive(Clone)]
499    /// struct ServiceHandlerTest;
500    ///
501    /// impl ServiceHandler for ServiceHandlerTest {
502    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
503    ///         if username == "test" {
504    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
505    ///         } else {
506    ///             None
507    ///         }
508    ///     }
509    /// }
510    ///
511    /// let addr = Identifier {
512    ///     source: "127.0.0.1:8080".parse().unwrap(),
513    ///     interface: "127.0.0.1:3478".parse().unwrap(),
514    /// };
515    ///
516    /// let digest = Password::Md5([
517    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
518    ///     239,
519    /// ]);
520    ///
521    /// let sessions = SessionManager::new(SessionManagerOptions {
522    ///     port_range: (49152..65535).into(),
523    ///     handler: ServiceHandlerTest,
524    /// });
525    ///
526    /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
527    ///
528    /// {
529    ///     let lock = sessions.get_session(&addr);
530    ///     let session = lock.get_ref().unwrap();
531    ///     match session {
532    ///         Session::Authenticated { username, allocate_port, allocate_channels, .. } => {
533    ///             assert_eq!(username, "test");
534    ///             assert_eq!(allocate_port, &None);
535    ///             assert_eq!(allocate_channels.len(), 0);
536    ///         }
537    ///         _ => panic!("Expected authenticated session"),
538    ///     }
539    /// }
540    ///
541    /// let port = sessions.allocate(&addr, None).unwrap();
542    /// {
543    ///     let lock = sessions.get_session(&addr);
544    ///     let session = lock.get_ref().unwrap();
545    ///     match session {
546    ///         Session::Authenticated { username, allocate_port, allocate_channels, .. } => {
547    ///             assert_eq!(username, "test");
548    ///             assert_eq!(allocate_port, &Some(port));
549    ///             assert_eq!(allocate_channels.len(), 0);
550    ///         }
551    ///         _ => panic!("Expected authenticated session"),
552    ///     }
553    /// }
554    ///
555    /// assert_eq!(sessions.allocate(&addr, None), Some(port));
556    /// ```
557    pub fn allocate(&self, addr: &Identifier, lifetime: Option<u32>) -> Option<u16> {
558        let mut lock = self.sessions.write();
559
560        if let Some(Session::Authenticated {
561            allocate_port,
562            expires,
563            ..
564        }) = lock.get_mut(addr)
565        {
566            // If the port has already been allocated, re-allocation is not allowed.
567            if let Some(port) = allocate_port {
568                return Some(*port);
569            }
570
571            // Records the port assigned to the current session and resets the alive time.
572            let port = self.port_allocator.lock().alloc(None)?;
573            *expires = self.timer.get() + (lifetime.unwrap_or(600) as u64);
574            *allocate_port = Some(port);
575
576            // Write the allocation port binding table.
577            self.port_mapping_table.write().insert(port, *addr);
578            Some(port)
579        } else {
580            None
581        }
582    }
583
584    /// Create permission for session.
585    ///
586    /// # Test
587    ///
588    /// ```
589    /// use turn_server::service::session::*;
590    /// use turn_server::service::*;
591    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
592    /// use turn_server::codec::crypto::Password;
593    /// use pollster::FutureExt;
594    ///
595    /// #[derive(Clone)]
596    /// struct ServiceHandlerTest;
597    ///
598    /// impl ServiceHandler for ServiceHandlerTest {
599    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
600    ///         if username == "test" {
601    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
602    ///         } else {
603    ///             None
604    ///         }
605    ///     }
606    /// }
607    ///
608    /// let endpoint = "127.0.0.1:3478".parse().unwrap();
609    /// let addr = Identifier {
610    ///     source: "127.0.0.1:8080".parse().unwrap(),
611    ///     interface: "127.0.0.1:3478".parse().unwrap(),
612    /// };
613    ///
614    /// let peer_addr = Identifier {
615    ///     source: "127.0.0.1:8081".parse().unwrap(),
616    ///     interface: "127.0.0.1:3478".parse().unwrap(),
617    /// };
618    ///
619    /// let digest = Password::Md5([
620    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
621    ///     239,
622    /// ]);
623    ///
624    /// let sessions = SessionManager::new(SessionManagerOptions {
625    ///     port_range: (49152..65535).into(),
626    ///     handler: ServiceHandlerTest,
627    /// });
628    ///
629    /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
630    /// sessions.get_password(&peer_addr, "test", PasswordAlgorithm::Md5).block_on();
631    ///
632    /// let port = sessions.allocate(&addr, None).unwrap();
633    /// let peer_port = sessions.allocate(&peer_addr, None).unwrap();
634    ///
635    /// assert!(!sessions.create_permission(&addr, &endpoint, &[port]));
636    /// assert!(sessions.create_permission(&addr, &endpoint, &[peer_port]));
637    ///
638    /// assert!(!sessions.create_permission(&peer_addr, &endpoint, &[peer_port]));
639    /// assert!(sessions.create_permission(&peer_addr, &endpoint, &[port]));
640    /// ```
641    pub fn create_permission(
642        &self,
643        addr: &Identifier,
644        endpoint: &SocketAddr,
645        ports: &[u16],
646    ) -> bool {
647        let mut sessions = self.sessions.write();
648        let mut port_relay_table = self.port_relay_table.write();
649        let port_mapping_table = self.port_mapping_table.read();
650
651        // Finds information about the current session.
652        if let Some(Session::Authenticated {
653            allocate_port,
654            permissions,
655            ..
656        }) = sessions.get_mut(addr)
657        {
658            // The port number assigned to the current session.
659            let local_port = if let Some(it) = allocate_port {
660                *it
661            } else {
662                return false;
663            };
664
665            // You cannot create permissions for yourself.
666            if ports.contains(&local_port) {
667                return false;
668            }
669
670            // Each peer port must be present.
671            let mut peers = Vec::with_capacity(15);
672            for port in ports {
673                if let Some(it) = port_mapping_table.get(port) {
674                    peers.push((it, *port));
675                } else {
676                    return false;
677                }
678            }
679
680            // Create a port forwarding mapping relationship for each peer session.
681            for (peer, port) in peers {
682                port_relay_table
683                    .entry(*peer)
684                    .or_insert_with(|| HashMap::with_capacity(20))
685                    .insert(
686                        local_port,
687                        Endpoint {
688                            source: addr.source,
689                            endpoint: *endpoint,
690                        },
691                    );
692
693                // Do not store the same peer ports to the permission list over and over again.
694                if !permissions.contains(&port) {
695                    permissions.push(port);
696                }
697            }
698
699            true
700        } else {
701            false
702        }
703    }
704
705    /// Binding a channel to the session.
706    ///
707    /// # Test
708    ///
709    /// ```
710    /// use turn_server::service::session::*;
711    /// use turn_server::service::*;
712    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
713    /// use turn_server::codec::crypto::Password;
714    /// use pollster::FutureExt;
715    ///
716    /// #[derive(Clone)]
717    /// struct ServiceHandlerTest;
718    ///
719    /// impl ServiceHandler for ServiceHandlerTest {
720    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
721    ///         if username == "test" {
722    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
723    ///         } else {
724    ///             None
725    ///         }
726    ///     }
727    /// }
728    ///
729    /// let endpoint = "127.0.0.1:3478".parse().unwrap();
730    /// let addr = Identifier {
731    ///     source: "127.0.0.1:8080".parse().unwrap(),
732    ///     interface: "127.0.0.1:3478".parse().unwrap(),
733    /// };
734    ///
735    /// let peer_addr = Identifier {
736    ///     source: "127.0.0.1:8081".parse().unwrap(),
737    ///     interface: "127.0.0.1:3478".parse().unwrap(),
738    /// };
739    ///
740    /// let digest = Password::Md5([
741    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
742    ///     239,
743    /// ]);
744    ///
745    /// let sessions = SessionManager::new(SessionManagerOptions {
746    ///     port_range: (49152..65535).into(),
747    ///     handler: ServiceHandlerTest,
748    /// });
749    ///
750    /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
751    /// sessions.get_password(&peer_addr, "test", PasswordAlgorithm::Md5).block_on();
752    ///
753    /// let port = sessions.allocate(&addr, None).unwrap();
754    /// let peer_port = sessions.allocate(&peer_addr, None).unwrap();
755    /// {
756    ///     assert_eq!(
757    ///         match sessions.get_session(&addr).get_ref().unwrap() {
758    ///             Session::Authenticated { allocate_channels, .. } => allocate_channels.len(),
759    ///             _ => panic!("Expected authenticated session"),
760    ///         },
761    ///         0
762    ///     );
763    /// }
764    ///
765    /// {
766    ///     assert_eq!(
767    ///         match sessions.get_session(&peer_addr).get_ref().unwrap() {
768    ///             Session::Authenticated { allocate_channels, .. } => allocate_channels.len(),
769    ///             _ => panic!("Expected authenticated session"),
770    ///         },
771    ///         0
772    ///     );
773    /// }
774    ///
775    /// assert!(sessions.bind_channel(&addr, &endpoint, peer_port, 0x4000));
776    /// assert!(sessions.bind_channel(&peer_addr, &endpoint, port, 0x4000));
777    ///
778    /// {
779    ///     assert_eq!(
780    ///         match sessions.get_session(&addr).get_ref().unwrap() {
781    ///             Session::Authenticated { allocate_channels, .. } => allocate_channels.clone(),
782    ///             _ => panic!("Expected authenticated session"),
783    ///         },
784    ///         vec![0x4000]
785    ///     );
786    /// }
787    ///
788    /// {
789    ///     assert_eq!(
790    ///         match sessions.get_session(&peer_addr).get_ref().unwrap() {
791    ///             Session::Authenticated { allocate_channels, .. } => allocate_channels.clone(),
792    ///             _ => panic!("Expected authenticated session"),
793    ///         },
794    ///         vec![0x4000]
795    ///     );
796    /// }
797    /// ```
798    pub fn bind_channel(
799        &self,
800        addr: &Identifier,
801        endpoint: &SocketAddr,
802        port: u16,
803        channel: u16,
804    ) -> bool {
805        // Finds the address of the bound opposing port.
806        let peer = if let Some(it) = self.port_mapping_table.read().get(&port) {
807            *it
808        } else {
809            return false;
810        };
811
812        // Records the channel used for the current session.
813        {
814            let mut lock = self.sessions.write();
815            if let Some(Session::Authenticated {
816                allocate_channels, ..
817            }) = lock.get_mut(addr)
818            {
819                if !allocate_channels.contains(&channel) {
820                    allocate_channels.push(channel);
821                }
822            } else {
823                return false;
824            };
825        }
826
827        // Binding ports also creates permissions.
828        if !self.create_permission(addr, endpoint, &[port]) {
829            return false;
830        }
831
832        // Create channel forwarding mapping relationships for peers.
833        self.channel_relay_table
834            .write()
835            .entry(peer)
836            .or_insert_with(|| HashMap::with_capacity(10))
837            .insert(
838                channel,
839                Endpoint {
840                    source: addr.source,
841                    endpoint: *endpoint,
842                },
843            );
844
845        true
846    }
847
848    /// Gets the peer of the current session bound channel.
849    ///
850    /// # Test
851    ///
852    /// ```
853    /// use turn_server::service::session::*;
854    /// use turn_server::service::*;
855    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
856    /// use turn_server::codec::crypto::Password;
857    /// use pollster::FutureExt;
858    ///
859    /// #[derive(Clone)]
860    /// struct ServiceHandlerTest;
861    ///
862    /// impl ServiceHandler for ServiceHandlerTest {
863    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
864    ///         if username == "test" {
865    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
866    ///         } else {
867    ///             None
868    ///         }
869    ///     }
870    /// }
871    ///
872    /// let endpoint = "127.0.0.1:3478".parse().unwrap();
873    /// let addr = Identifier {
874    ///     source: "127.0.0.1:8080".parse().unwrap(),
875    ///     interface: "127.0.0.1:3478".parse().unwrap(),
876    /// };
877    ///
878    /// let peer_addr = Identifier {
879    ///     source: "127.0.0.1:8081".parse().unwrap(),
880    ///     interface: "127.0.0.1:3478".parse().unwrap(),
881    /// };
882    ///
883    /// let digest = Password::Md5([
884    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
885    ///     239,
886    /// ]);
887    ///
888    /// let sessions = SessionManager::new(SessionManagerOptions {
889    ///     port_range: (49152..65535).into(),
890    ///     handler: ServiceHandlerTest,
891    /// });
892    ///
893    /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
894    /// sessions.get_password(&peer_addr, "test", PasswordAlgorithm::Md5).block_on();
895    ///
896    /// let port = sessions.allocate(&addr, None).unwrap();
897    /// let peer_port = sessions.allocate(&peer_addr, None).unwrap();
898    ///
899    /// assert!(sessions.bind_channel(&addr, &endpoint, peer_port, 0x4000));
900    /// assert!(sessions.bind_channel(&peer_addr, &endpoint, port, 0x4000));
901    /// assert_eq!(
902    ///     sessions
903    ///         .get_channel_relay_address(&addr, 0x4000)
904    ///         .unwrap()
905    ///         .endpoint,
906    ///     endpoint
907    /// );
908    ///
909    /// assert_eq!(
910    ///     sessions
911    ///         .get_channel_relay_address(&peer_addr, 0x4000)
912    ///         .unwrap()
913    ///         .endpoint,
914    ///     endpoint
915    /// );
916    /// ```
917    pub fn get_channel_relay_address(&self, addr: &Identifier, channel: u16) -> Option<Endpoint> {
918        self.channel_relay_table
919            .read()
920            .get(addr)?
921            .get(&channel)
922            .copied()
923    }
924
925    /// Get the address of the port binding.
926    ///
927    /// # Test
928    ///
929    /// ```
930    /// use turn_server::service::session::*;
931    /// use turn_server::service::*;
932    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
933    /// use turn_server::codec::crypto::Password;
934    /// use pollster::FutureExt;
935    ///
936    /// #[derive(Clone)]
937    /// struct ServiceHandlerTest;
938    ///
939    /// impl ServiceHandler for ServiceHandlerTest {
940    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
941    ///         if username == "test" {
942    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
943    ///         } else {
944    ///             None
945    ///         }
946    ///     }
947    /// }
948    ///
949    /// let endpoint = "127.0.0.1:3478".parse().unwrap();
950    /// let addr = Identifier {
951    ///     source: "127.0.0.1:8080".parse().unwrap(),
952    ///     interface: "127.0.0.1:3478".parse().unwrap(),
953    /// };
954    ///
955    /// let peer_addr = Identifier {
956    ///     source: "127.0.0.1:8081".parse().unwrap(),
957    ///     interface: "127.0.0.1:3478".parse().unwrap(),
958    /// };
959    ///
960    /// let digest = Password::Md5([
961    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
962    ///     239,
963    /// ]);
964    ///
965    /// let sessions = SessionManager::new(SessionManagerOptions {
966    ///     port_range: (49152..65535).into(),
967    ///     handler: ServiceHandlerTest,
968    /// });
969    ///
970    /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
971    /// sessions.get_password(&peer_addr, "test", PasswordAlgorithm::Md5).block_on();
972    ///
973    /// let port = sessions.allocate(&addr, None).unwrap();
974    /// let peer_port = sessions.allocate(&peer_addr, None).unwrap();
975    ///
976    /// assert!(sessions.create_permission(&addr, &endpoint, &[peer_port]));
977    /// assert!(sessions.create_permission(&peer_addr, &endpoint, &[port]));
978    ///
979    /// assert_eq!(
980    ///     sessions
981    ///         .get_relay_address(&addr, peer_port)
982    ///         .unwrap()
983    ///         .endpoint,
984    ///     endpoint
985    /// );
986    ///
987    /// assert_eq!(
988    ///     sessions
989    ///         .get_relay_address(&peer_addr, port)
990    ///         .unwrap()
991    ///         .endpoint,
992    ///     endpoint
993    /// );
994    /// ```
995    pub fn get_relay_address(&self, addr: &Identifier, port: u16) -> Option<Endpoint> {
996        self.port_relay_table.read().get(addr)?.get(&port).copied()
997    }
998
999    /// Refresh the session for addr.
1000    ///
1001    /// # Test
1002    ///
1003    /// ```
1004    /// use turn_server::service::session::*;
1005    /// use turn_server::service::*;
1006    /// use turn_server::codec::message::attributes::PasswordAlgorithm;
1007    /// use turn_server::codec::crypto::Password;
1008    /// use pollster::FutureExt;
1009    ///
1010    /// #[derive(Clone)]
1011    /// struct ServiceHandlerTest;
1012    ///
1013    /// impl ServiceHandler for ServiceHandlerTest {
1014    ///     async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
1015    ///         if username == "test" {
1016    ///             Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
1017    ///         } else {
1018    ///             None
1019    ///         }
1020    ///     }
1021    /// }
1022    ///
1023    /// let addr = Identifier {
1024    ///     source: "127.0.0.1:8080".parse().unwrap(),
1025    ///     interface: "127.0.0.1:3478".parse().unwrap(),
1026    /// };
1027    ///
1028    /// let digest = Password::Md5([
1029    ///     174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
1030    ///     239,
1031    /// ]);
1032    ///
1033    /// let sessions = SessionManager::new(SessionManagerOptions {
1034    ///     port_range: (49152..65535).into(),
1035    ///     handler: ServiceHandlerTest,
1036    /// });
1037    ///
1038    /// // get_session always creates a new session if it doesn't exist
1039    /// {
1040    ///     let lock = sessions.get_session_or_default(&addr);
1041    ///     let session = lock.get_ref().unwrap();
1042    ///     match session {
1043    ///         Session::New { .. } => {},
1044    ///         _ => panic!("Expected new session"),
1045    ///     }
1046    /// }
1047    ///
1048    /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
1049    ///
1050    /// {
1051    ///     let lock = sessions.get_session(&addr);
1052    ///     let expires = match lock.get_ref().unwrap() {
1053    ///         Session::Authenticated { expires, .. } => *expires,
1054    ///         _ => panic!("Expected authenticated session"),
1055    ///     };
1056    ///
1057    ///     assert!(expires == 600 || expires == 601 || expires == 602);
1058    /// }
1059    ///
1060    /// assert!(sessions.refresh(&addr, 0));
1061    ///
1062    /// // After refresh with lifetime 0, session should be removed
1063    /// {
1064    ///     assert!(sessions.get_session(&addr).get_ref().is_none());
1065    /// }
1066    /// ```
1067    pub fn refresh(&self, addr: &Identifier, lifetime: u32) -> bool {
1068        if lifetime > 3600 {
1069            return false;
1070        }
1071
1072        if lifetime == 0 {
1073            self.remove_session(&[*addr]);
1074        } else if let Some(Session::Authenticated { expires, .. }) =
1075            self.sessions.write().get_mut(addr)
1076        {
1077            *expires = self.timer.get() + lifetime as u64;
1078        } else {
1079            return false;
1080        }
1081
1082        true
1083    }
1084}
1085
1086/// Generate a cryptographically random nonce for STUN/TURN authentication.
1087///
1088/// The nonce is a critical security component in STUN/TURN's long-term credential
1089/// mechanism (RFC 5389). It serves multiple purposes:
1090/// - Prevents replay attacks by ensuring each authentication is unique
1091/// - Acts as a server-issued challenge in the digest authentication flow
1092/// - Binds authentication attempts to specific sessions
1093///
1094/// This implementation generates a 16-character alphanumeric string using a
1095/// cryptographically secure random number generator. The length is chosen to
1096/// provide sufficient entropy (approximately 95 bits) to make brute-force
1097/// attacks computationally infeasible while remaining well under the RFC's
1098/// 128-character limit.
1099///
1100/// # Returns
1101/// A random 16-character string containing alphanumeric characters [a-zA-Z0-9].
1102///
1103/// # Security
1104/// Uses `rand::rng()` which provides cryptographic-quality randomness suitable
1105/// for security-sensitive operations. See RFC 7616 Section 5.4 for additional
1106/// guidance on nonce value selection.
1107fn make_nonce() -> String {
1108    rand::rng()
1109        .sample_iter(&Alphanumeric)
1110        .take(16)
1111        .map(char::from)
1112        .collect()
1113}