turn_server/service/session/mod.rs
1pub mod ports;
2
3use super::{
4 ServiceHandler,
5 session::ports::{PortAllocator, PortRange},
6};
7
8use crate::codec::{crypto::Password, message::attributes::PasswordAlgorithm};
9
10use std::{
11 hash::Hash,
12 net::SocketAddr,
13 ops::{Deref, DerefMut},
14 sync::{
15 Arc,
16 atomic::{AtomicU64, Ordering},
17 },
18 thread::{self, sleep},
19 time::Duration,
20};
21
22use ahash::{HashMap, HashMapExt};
23use parking_lot::{Mutex, RwLock, RwLockReadGuard};
24use rand::{Rng, distr::Alphanumeric};
25
26/// The identifier of the session or addr.
27///
28/// Each session needs to be identified by a combination of three pieces of
29/// information: the addr address, and the transport protocol.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
31pub struct Identifier {
32 pub source: SocketAddr,
33 pub interface: SocketAddr,
34}
35
36/// The addr used to record the current session.
37///
38/// This is used when forwarding data.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
40pub struct Endpoint {
41 pub source: SocketAddr,
42 pub endpoint: SocketAddr,
43}
44
45/// The default HashMap is created without allocating capacity. To improve
46/// performance, the turn server needs to pre-allocate the available capacity.
47///
48/// So here the HashMap is rewrapped to allocate a large capacity (number of
49/// ports that can be allocated) at the default creation time as well.
50pub struct Table<K, V>(HashMap<K, V>);
51
52impl<K, V> Default for Table<K, V> {
53 fn default() -> Self {
54 Self(HashMap::with_capacity(PortRange::default().size()))
55 }
56}
57
58impl<K, V> AsRef<HashMap<K, V>> for Table<K, V> {
59 fn as_ref(&self) -> &HashMap<K, V> {
60 &self.0
61 }
62}
63
64impl<K, V> Deref for Table<K, V> {
65 type Target = HashMap<K, V>;
66
67 fn deref(&self) -> &Self::Target {
68 &self.0
69 }
70}
71
72impl<K, V> DerefMut for Table<K, V> {
73 fn deref_mut(&mut self) -> &mut Self::Target {
74 &mut self.0
75 }
76}
77
78/// Used to lengthen the timing of the release of a readable lock guard and to
79/// provide a more convenient way for external access to the lock's internal
80/// data.
81pub struct ReadLock<'a, 'b, K, R> {
82 pub key: &'a K,
83 pub lock: RwLockReadGuard<'b, R>,
84}
85
86impl<'a, 'b, K, V> ReadLock<'a, 'b, K, Table<K, V>>
87where
88 K: Eq + Hash,
89{
90 pub fn get_ref(&self) -> Option<&V> {
91 self.lock.get(self.key)
92 }
93}
94
95/// A specially optimised timer.
96///
97/// This timer does not stack automatically and needs to be stacked externally
98/// and manually.
99///
100/// ```
101/// use turn_server::service::session::Timer;
102///
103/// let timer = Timer::default();
104///
105/// assert_eq!(timer.get(), 0);
106/// assert_eq!(timer.add(), 1);
107/// assert_eq!(timer.get(), 1);
108/// ```
109#[derive(Default)]
110pub struct Timer(AtomicU64);
111
112impl Timer {
113 pub fn get(&self) -> u64 {
114 self.0.load(Ordering::Relaxed)
115 }
116
117 pub fn add(&self) -> u64 {
118 self.0.fetch_add(1, Ordering::Relaxed) + 1
119 }
120}
121
122/// turn session information.
123///
124/// A user can have many sessions.
125///
126/// The default survival time for a session is 600 seconds.
127#[derive(Debug, Clone)]
128pub enum Session {
129 New {
130 nonce: String,
131 expires: u64,
132 },
133 Authenticated {
134 nonce: String,
135 /// Authentication information for the session.
136 ///
137 /// Digest data is data that summarises usernames and passwords by means of
138 /// long-term authentication.
139 username: String,
140 password: Password,
141 /// Assignment information for the session.
142 ///
143 /// SessionManager are all bound to only one port and one channel.
144 allocate_port: Option<u16>,
145 allocate_channels: Vec<u16>,
146 permissions: Vec<u16>,
147 expires: u64,
148 },
149}
150
151impl Session {
152 /// Get the nonce of the session.
153 pub fn nonce(&self) -> &str {
154 match self {
155 Session::New { nonce, .. } | Session::Authenticated { nonce, .. } => nonce,
156 }
157 }
158
159 /// Check if the session is a new session.
160 pub fn is_new(&self) -> bool {
161 matches!(self, Session::New { .. })
162 }
163
164 /// Check if the session is an authenticated session.
165 pub fn is_authenticated(&self) -> bool {
166 matches!(self, Session::Authenticated { .. })
167 }
168}
169
170pub struct SessionManagerOptions<T> {
171 pub port_range: PortRange,
172 pub handler: T,
173}
174
175pub struct SessionManager<T> {
176 sessions: RwLock<Table<Identifier, Session>>,
177 port_allocator: Mutex<PortAllocator>,
178 // Records the sessions corresponding to each assigned port, which will be needed when looking
179 // up sessions assigned to this port based on the port number.
180 port_mapping_table: RwLock<Table</* port */ u16, Identifier>>,
181 // Stores the address to which the session should be forwarded when it sends indication to a
182 // port. This is written when permissions are created to allow a certain address to be
183 // forwarded to the current session.
184 port_relay_table: RwLock<Table<Identifier, HashMap</* port */ u16, Endpoint>>>,
185 // Indicates to which session the data sent by a session to a channel should be forwarded.
186 channel_relay_table: RwLock<Table<Identifier, HashMap</* channel */ u16, Endpoint>>>,
187 timer: Timer,
188 handler: T,
189}
190
191impl<T> SessionManager<T>
192where
193 T: ServiceHandler,
194{
195 pub fn new(options: SessionManagerOptions<T>) -> Arc<Self> {
196 let this = Arc::new(Self {
197 port_allocator: Mutex::new(PortAllocator::new(options.port_range)),
198 channel_relay_table: RwLock::new(Table::default()),
199 port_mapping_table: RwLock::new(Table::default()),
200 port_relay_table: RwLock::new(Table::default()),
201 sessions: RwLock::new(Table::default()),
202 timer: Timer::default(),
203 handler: options.handler,
204 });
205
206 // This is a background thread that silently handles expiring sessions and
207 // cleans up session information when it expires.
208 let this_ = Arc::downgrade(&this);
209 thread::spawn(move || {
210 let mut address = Vec::with_capacity(255);
211
212 while let Some(this) = this_.upgrade() {
213 // The timer advances one second and gets the current time offset.
214 let now = this.timer.add();
215
216 // This is the part that deletes the session information.
217 {
218 // Finds sessions that have expired.
219 {
220 this.sessions
221 .read()
222 .iter()
223 .filter(|(_, v)| match v {
224 Session::New { expires, .. }
225 | Session::Authenticated { expires, .. } => *expires <= now,
226 })
227 .for_each(|(k, _)| address.push(*k));
228 }
229
230 // Delete the expired sessions.
231 if !address.is_empty() {
232 this.remove_session(&address);
233 address.clear();
234 }
235 }
236
237 // Fixing a second tick.
238 sleep(Duration::from_secs(1));
239 }
240 });
241
242 this
243 }
244
245 fn remove_session(&self, addrs: &[Identifier]) {
246 let mut sessions = self.sessions.write();
247 let mut port_allocator = self.port_allocator.lock();
248 let mut port_mapping_table = self.port_mapping_table.write();
249 let mut port_relay_table = self.port_relay_table.write();
250 let mut channel_relay_table = self.channel_relay_table.write();
251
252 addrs.iter().for_each(|k| {
253 port_relay_table.remove(k);
254 channel_relay_table.remove(k);
255
256 if let Some(Session::Authenticated {
257 allocate_port,
258 username,
259 ..
260 }) = sessions.remove(k)
261 {
262 // Removes the session-bound port from the port binding table and
263 // releases the port back into the allocation pool.
264 if let Some(port) = allocate_port {
265 port_mapping_table.remove(&port);
266 port_allocator.restore(port);
267 }
268
269 // Notifies that the external session has been closed.
270 self.handler.on_destroy(k, &username);
271 }
272 });
273 }
274
275 /// Get session for addr.
276 ///
277 /// # Test
278 ///
279 /// ```
280 /// use turn_server::service::session::*;
281 /// use turn_server::service::*;
282 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
283 /// use turn_server::codec::crypto::Password;
284 /// use pollster::FutureExt;
285 ///
286 /// #[derive(Clone)]
287 /// struct ServiceHandlerTest;
288 ///
289 /// impl ServiceHandler for ServiceHandlerTest {
290 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
291 /// if username == "test" {
292 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
293 /// } else {
294 /// None
295 /// }
296 /// }
297 /// }
298 ///
299 /// let addr = Identifier {
300 /// source: "127.0.0.1:8080".parse().unwrap(),
301 /// interface: "127.0.0.1:3478".parse().unwrap(),
302 /// };
303 ///
304 /// let digest = Password::Md5([
305 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
306 /// 239,
307 /// ]);
308 ///
309 /// let sessions = SessionManager::new(SessionManagerOptions {
310 /// port_range: (49152..65535).into(),
311 /// handler: ServiceHandlerTest,
312 /// });
313 ///
314 /// // get_session always creates a new session if it doesn't exist
315 /// {
316 /// assert!(sessions.get_session(&addr).get_ref().is_none());
317 /// }
318 ///
319 /// // get_session always creates a new session if it doesn't exist
320 /// {
321 /// let lock = sessions.get_session_or_default(&addr);
322 /// let session = lock.get_ref().unwrap();
323 /// match session {
324 /// Session::New { .. } => {},
325 /// _ => panic!("Expected new session"),
326 /// }
327 /// }
328 ///
329 /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
330 ///
331 /// {
332 /// let lock = sessions.get_session(&addr);
333 /// let session = lock.get_ref().unwrap();
334 /// match session {
335 /// Session::Authenticated { username, allocate_port, allocate_channels, .. } => {
336 /// assert_eq!(username, "test");
337 /// assert_eq!(allocate_port, &None);
338 /// assert_eq!(allocate_channels.len(), 0);
339 /// }
340 /// _ => panic!("Expected authenticated session"),
341 /// }
342 /// }
343 /// ```
344 pub fn get_session_or_default<'a, 'b>(
345 &'a self,
346 key: &'b Identifier,
347 ) -> ReadLock<'b, 'a, Identifier, Table<Identifier, Session>> {
348 {
349 let lock = self.sessions.read();
350 if lock.contains_key(key) {
351 return ReadLock {
352 lock: self.sessions.read(),
353 key,
354 };
355 }
356 }
357
358 {
359 self.sessions.write().insert(
360 *key,
361 Session::New {
362 // A random string of length 16.
363 nonce: make_nonce(),
364 // Current time stacks for 600 seconds.
365 expires: self.timer.get() + 600,
366 },
367 );
368 }
369
370 ReadLock {
371 lock: self.sessions.read(),
372 key,
373 }
374 }
375
376 pub fn get_session<'a, 'b>(
377 &'a self,
378 key: &'b Identifier,
379 ) -> ReadLock<'b, 'a, Identifier, Table<Identifier, Session>> {
380 ReadLock {
381 lock: self.sessions.read(),
382 key,
383 }
384 }
385
386 /// Get digest for addr.
387 ///
388 /// # Test
389 ///
390 /// ```
391 /// use turn_server::service::session::*;
392 /// use turn_server::service::*;
393 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
394 /// use turn_server::codec::crypto::Password;
395 /// use pollster::FutureExt;
396 ///
397 /// #[derive(Clone)]
398 /// struct ServiceHandlerTest;
399 ///
400 /// impl ServiceHandler for ServiceHandlerTest {
401 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
402 /// if username == "test" {
403 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
404 /// } else {
405 /// None
406 /// }
407 /// }
408 /// }
409 ///
410 /// let addr = Identifier {
411 /// source: "127.0.0.1:8080".parse().unwrap(),
412 /// interface: "127.0.0.1:3478".parse().unwrap(),
413 /// };
414 ///
415 /// let digest = Password::Md5([
416 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
417 /// 239,
418 /// ]);
419 ///
420 /// let sessions = SessionManager::new(SessionManagerOptions {
421 /// port_range: (49152..65535).into(),
422 /// handler: ServiceHandlerTest,
423 /// });
424 ///
425 /// // First call get_session to create a new session
426 /// {
427 /// sessions.get_session(&addr);
428 /// }
429 /// assert_eq!(pollster::block_on(sessions.get_password(&addr, "test1", PasswordAlgorithm::Md5)), None);
430 ///
431 /// // Create a new session for the next test
432 /// {
433 /// sessions.get_session(&addr);
434 /// }
435 /// assert_eq!(sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on(), Some(digest));
436 ///
437 /// // The third call should return cached digest
438 /// assert_eq!(sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on(), Some(digest));
439 /// ```
440 pub async fn get_password(
441 &self,
442 addr: &Identifier,
443 username: &str,
444 algorithm: PasswordAlgorithm,
445 ) -> Option<Password> {
446 // Already authenticated, get the cached digest directly.
447 {
448 if let Some(Session::Authenticated { password, .. }) = self.sessions.read().get(addr) {
449 return Some(*password);
450 }
451 }
452
453 // Get the current user's password from an external handler and create a
454 // digest.
455 let password = self.handler.get_password(username, algorithm).await?;
456
457 // Record a new session.
458 {
459 let mut lock = self.sessions.write();
460 let nonce = if let Some(Session::New { nonce, .. }) = lock.remove(addr) {
461 nonce
462 } else {
463 make_nonce()
464 };
465
466 lock.insert(
467 *addr,
468 Session::Authenticated {
469 allocate_channels: Vec::with_capacity(10),
470 permissions: Vec::with_capacity(10),
471 expires: self.timer.get() + 600,
472 username: username.to_string(),
473 allocate_port: None,
474 password,
475 nonce,
476 },
477 );
478 }
479
480 Some(password)
481 }
482
483 pub fn allocated(&self) -> usize {
484 self.port_allocator.lock().len()
485 }
486
487 /// Assign a port number to the session.
488 ///
489 /// # Test
490 ///
491 /// ```
492 /// use turn_server::service::session::*;
493 /// use turn_server::service::*;
494 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
495 /// use turn_server::codec::crypto::Password;
496 /// use pollster::FutureExt;
497 ///
498 /// #[derive(Clone)]
499 /// struct ServiceHandlerTest;
500 ///
501 /// impl ServiceHandler for ServiceHandlerTest {
502 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
503 /// if username == "test" {
504 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
505 /// } else {
506 /// None
507 /// }
508 /// }
509 /// }
510 ///
511 /// let addr = Identifier {
512 /// source: "127.0.0.1:8080".parse().unwrap(),
513 /// interface: "127.0.0.1:3478".parse().unwrap(),
514 /// };
515 ///
516 /// let digest = Password::Md5([
517 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
518 /// 239,
519 /// ]);
520 ///
521 /// let sessions = SessionManager::new(SessionManagerOptions {
522 /// port_range: (49152..65535).into(),
523 /// handler: ServiceHandlerTest,
524 /// });
525 ///
526 /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
527 ///
528 /// {
529 /// let lock = sessions.get_session(&addr);
530 /// let session = lock.get_ref().unwrap();
531 /// match session {
532 /// Session::Authenticated { username, allocate_port, allocate_channels, .. } => {
533 /// assert_eq!(username, "test");
534 /// assert_eq!(allocate_port, &None);
535 /// assert_eq!(allocate_channels.len(), 0);
536 /// }
537 /// _ => panic!("Expected authenticated session"),
538 /// }
539 /// }
540 ///
541 /// let port = sessions.allocate(&addr, None).unwrap();
542 /// {
543 /// let lock = sessions.get_session(&addr);
544 /// let session = lock.get_ref().unwrap();
545 /// match session {
546 /// Session::Authenticated { username, allocate_port, allocate_channels, .. } => {
547 /// assert_eq!(username, "test");
548 /// assert_eq!(allocate_port, &Some(port));
549 /// assert_eq!(allocate_channels.len(), 0);
550 /// }
551 /// _ => panic!("Expected authenticated session"),
552 /// }
553 /// }
554 ///
555 /// assert_eq!(sessions.allocate(&addr, None), Some(port));
556 /// ```
557 pub fn allocate(&self, addr: &Identifier, lifetime: Option<u32>) -> Option<u16> {
558 let mut lock = self.sessions.write();
559
560 if let Some(Session::Authenticated {
561 allocate_port,
562 expires,
563 ..
564 }) = lock.get_mut(addr)
565 {
566 // If the port has already been allocated, re-allocation is not allowed.
567 if let Some(port) = allocate_port {
568 return Some(*port);
569 }
570
571 // Records the port assigned to the current session and resets the alive time.
572 let port = self.port_allocator.lock().alloc(None)?;
573 *expires = self.timer.get() + (lifetime.unwrap_or(600) as u64);
574 *allocate_port = Some(port);
575
576 // Write the allocation port binding table.
577 self.port_mapping_table.write().insert(port, *addr);
578 Some(port)
579 } else {
580 None
581 }
582 }
583
584 /// Create permission for session.
585 ///
586 /// # Test
587 ///
588 /// ```
589 /// use turn_server::service::session::*;
590 /// use turn_server::service::*;
591 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
592 /// use turn_server::codec::crypto::Password;
593 /// use pollster::FutureExt;
594 ///
595 /// #[derive(Clone)]
596 /// struct ServiceHandlerTest;
597 ///
598 /// impl ServiceHandler for ServiceHandlerTest {
599 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
600 /// if username == "test" {
601 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
602 /// } else {
603 /// None
604 /// }
605 /// }
606 /// }
607 ///
608 /// let endpoint = "127.0.0.1:3478".parse().unwrap();
609 /// let addr = Identifier {
610 /// source: "127.0.0.1:8080".parse().unwrap(),
611 /// interface: "127.0.0.1:3478".parse().unwrap(),
612 /// };
613 ///
614 /// let peer_addr = Identifier {
615 /// source: "127.0.0.1:8081".parse().unwrap(),
616 /// interface: "127.0.0.1:3478".parse().unwrap(),
617 /// };
618 ///
619 /// let digest = Password::Md5([
620 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
621 /// 239,
622 /// ]);
623 ///
624 /// let sessions = SessionManager::new(SessionManagerOptions {
625 /// port_range: (49152..65535).into(),
626 /// handler: ServiceHandlerTest,
627 /// });
628 ///
629 /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
630 /// sessions.get_password(&peer_addr, "test", PasswordAlgorithm::Md5).block_on();
631 ///
632 /// let port = sessions.allocate(&addr, None).unwrap();
633 /// let peer_port = sessions.allocate(&peer_addr, None).unwrap();
634 ///
635 /// assert!(!sessions.create_permission(&addr, &endpoint, &[port]));
636 /// assert!(sessions.create_permission(&addr, &endpoint, &[peer_port]));
637 ///
638 /// assert!(!sessions.create_permission(&peer_addr, &endpoint, &[peer_port]));
639 /// assert!(sessions.create_permission(&peer_addr, &endpoint, &[port]));
640 /// ```
641 pub fn create_permission(
642 &self,
643 addr: &Identifier,
644 endpoint: &SocketAddr,
645 ports: &[u16],
646 ) -> bool {
647 let mut sessions = self.sessions.write();
648 let mut port_relay_table = self.port_relay_table.write();
649 let port_mapping_table = self.port_mapping_table.read();
650
651 // Finds information about the current session.
652 if let Some(Session::Authenticated {
653 allocate_port,
654 permissions,
655 ..
656 }) = sessions.get_mut(addr)
657 {
658 // The port number assigned to the current session.
659 let local_port = if let Some(it) = allocate_port {
660 *it
661 } else {
662 return false;
663 };
664
665 // You cannot create permissions for yourself.
666 if ports.contains(&local_port) {
667 return false;
668 }
669
670 // Each peer port must be present.
671 let mut peers = Vec::with_capacity(15);
672 for port in ports {
673 if let Some(it) = port_mapping_table.get(port) {
674 peers.push((it, *port));
675 } else {
676 return false;
677 }
678 }
679
680 // Create a port forwarding mapping relationship for each peer session.
681 for (peer, port) in peers {
682 port_relay_table
683 .entry(*peer)
684 .or_insert_with(|| HashMap::with_capacity(20))
685 .insert(
686 local_port,
687 Endpoint {
688 source: addr.source,
689 endpoint: *endpoint,
690 },
691 );
692
693 // Do not store the same peer ports to the permission list over and over again.
694 if !permissions.contains(&port) {
695 permissions.push(port);
696 }
697 }
698
699 true
700 } else {
701 false
702 }
703 }
704
705 /// Binding a channel to the session.
706 ///
707 /// # Test
708 ///
709 /// ```
710 /// use turn_server::service::session::*;
711 /// use turn_server::service::*;
712 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
713 /// use turn_server::codec::crypto::Password;
714 /// use pollster::FutureExt;
715 ///
716 /// #[derive(Clone)]
717 /// struct ServiceHandlerTest;
718 ///
719 /// impl ServiceHandler for ServiceHandlerTest {
720 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
721 /// if username == "test" {
722 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
723 /// } else {
724 /// None
725 /// }
726 /// }
727 /// }
728 ///
729 /// let endpoint = "127.0.0.1:3478".parse().unwrap();
730 /// let addr = Identifier {
731 /// source: "127.0.0.1:8080".parse().unwrap(),
732 /// interface: "127.0.0.1:3478".parse().unwrap(),
733 /// };
734 ///
735 /// let peer_addr = Identifier {
736 /// source: "127.0.0.1:8081".parse().unwrap(),
737 /// interface: "127.0.0.1:3478".parse().unwrap(),
738 /// };
739 ///
740 /// let digest = Password::Md5([
741 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
742 /// 239,
743 /// ]);
744 ///
745 /// let sessions = SessionManager::new(SessionManagerOptions {
746 /// port_range: (49152..65535).into(),
747 /// handler: ServiceHandlerTest,
748 /// });
749 ///
750 /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
751 /// sessions.get_password(&peer_addr, "test", PasswordAlgorithm::Md5).block_on();
752 ///
753 /// let port = sessions.allocate(&addr, None).unwrap();
754 /// let peer_port = sessions.allocate(&peer_addr, None).unwrap();
755 /// {
756 /// assert_eq!(
757 /// match sessions.get_session(&addr).get_ref().unwrap() {
758 /// Session::Authenticated { allocate_channels, .. } => allocate_channels.len(),
759 /// _ => panic!("Expected authenticated session"),
760 /// },
761 /// 0
762 /// );
763 /// }
764 ///
765 /// {
766 /// assert_eq!(
767 /// match sessions.get_session(&peer_addr).get_ref().unwrap() {
768 /// Session::Authenticated { allocate_channels, .. } => allocate_channels.len(),
769 /// _ => panic!("Expected authenticated session"),
770 /// },
771 /// 0
772 /// );
773 /// }
774 ///
775 /// assert!(sessions.bind_channel(&addr, &endpoint, peer_port, 0x4000));
776 /// assert!(sessions.bind_channel(&peer_addr, &endpoint, port, 0x4000));
777 ///
778 /// {
779 /// assert_eq!(
780 /// match sessions.get_session(&addr).get_ref().unwrap() {
781 /// Session::Authenticated { allocate_channels, .. } => allocate_channels.clone(),
782 /// _ => panic!("Expected authenticated session"),
783 /// },
784 /// vec![0x4000]
785 /// );
786 /// }
787 ///
788 /// {
789 /// assert_eq!(
790 /// match sessions.get_session(&peer_addr).get_ref().unwrap() {
791 /// Session::Authenticated { allocate_channels, .. } => allocate_channels.clone(),
792 /// _ => panic!("Expected authenticated session"),
793 /// },
794 /// vec![0x4000]
795 /// );
796 /// }
797 /// ```
798 pub fn bind_channel(
799 &self,
800 addr: &Identifier,
801 endpoint: &SocketAddr,
802 port: u16,
803 channel: u16,
804 ) -> bool {
805 // Finds the address of the bound opposing port.
806 let peer = if let Some(it) = self.port_mapping_table.read().get(&port) {
807 *it
808 } else {
809 return false;
810 };
811
812 // Records the channel used for the current session.
813 {
814 let mut lock = self.sessions.write();
815 if let Some(Session::Authenticated {
816 allocate_channels, ..
817 }) = lock.get_mut(addr)
818 {
819 if !allocate_channels.contains(&channel) {
820 allocate_channels.push(channel);
821 }
822 } else {
823 return false;
824 };
825 }
826
827 // Binding ports also creates permissions.
828 if !self.create_permission(addr, endpoint, &[port]) {
829 return false;
830 }
831
832 // Create channel forwarding mapping relationships for peers.
833 self.channel_relay_table
834 .write()
835 .entry(peer)
836 .or_insert_with(|| HashMap::with_capacity(10))
837 .insert(
838 channel,
839 Endpoint {
840 source: addr.source,
841 endpoint: *endpoint,
842 },
843 );
844
845 true
846 }
847
848 /// Gets the peer of the current session bound channel.
849 ///
850 /// # Test
851 ///
852 /// ```
853 /// use turn_server::service::session::*;
854 /// use turn_server::service::*;
855 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
856 /// use turn_server::codec::crypto::Password;
857 /// use pollster::FutureExt;
858 ///
859 /// #[derive(Clone)]
860 /// struct ServiceHandlerTest;
861 ///
862 /// impl ServiceHandler for ServiceHandlerTest {
863 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
864 /// if username == "test" {
865 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
866 /// } else {
867 /// None
868 /// }
869 /// }
870 /// }
871 ///
872 /// let endpoint = "127.0.0.1:3478".parse().unwrap();
873 /// let addr = Identifier {
874 /// source: "127.0.0.1:8080".parse().unwrap(),
875 /// interface: "127.0.0.1:3478".parse().unwrap(),
876 /// };
877 ///
878 /// let peer_addr = Identifier {
879 /// source: "127.0.0.1:8081".parse().unwrap(),
880 /// interface: "127.0.0.1:3478".parse().unwrap(),
881 /// };
882 ///
883 /// let digest = Password::Md5([
884 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
885 /// 239,
886 /// ]);
887 ///
888 /// let sessions = SessionManager::new(SessionManagerOptions {
889 /// port_range: (49152..65535).into(),
890 /// handler: ServiceHandlerTest,
891 /// });
892 ///
893 /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
894 /// sessions.get_password(&peer_addr, "test", PasswordAlgorithm::Md5).block_on();
895 ///
896 /// let port = sessions.allocate(&addr, None).unwrap();
897 /// let peer_port = sessions.allocate(&peer_addr, None).unwrap();
898 ///
899 /// assert!(sessions.bind_channel(&addr, &endpoint, peer_port, 0x4000));
900 /// assert!(sessions.bind_channel(&peer_addr, &endpoint, port, 0x4000));
901 /// assert_eq!(
902 /// sessions
903 /// .get_channel_relay_address(&addr, 0x4000)
904 /// .unwrap()
905 /// .endpoint,
906 /// endpoint
907 /// );
908 ///
909 /// assert_eq!(
910 /// sessions
911 /// .get_channel_relay_address(&peer_addr, 0x4000)
912 /// .unwrap()
913 /// .endpoint,
914 /// endpoint
915 /// );
916 /// ```
917 pub fn get_channel_relay_address(&self, addr: &Identifier, channel: u16) -> Option<Endpoint> {
918 self.channel_relay_table
919 .read()
920 .get(addr)?
921 .get(&channel)
922 .copied()
923 }
924
925 /// Get the address of the port binding.
926 ///
927 /// # Test
928 ///
929 /// ```
930 /// use turn_server::service::session::*;
931 /// use turn_server::service::*;
932 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
933 /// use turn_server::codec::crypto::Password;
934 /// use pollster::FutureExt;
935 ///
936 /// #[derive(Clone)]
937 /// struct ServiceHandlerTest;
938 ///
939 /// impl ServiceHandler for ServiceHandlerTest {
940 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
941 /// if username == "test" {
942 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
943 /// } else {
944 /// None
945 /// }
946 /// }
947 /// }
948 ///
949 /// let endpoint = "127.0.0.1:3478".parse().unwrap();
950 /// let addr = Identifier {
951 /// source: "127.0.0.1:8080".parse().unwrap(),
952 /// interface: "127.0.0.1:3478".parse().unwrap(),
953 /// };
954 ///
955 /// let peer_addr = Identifier {
956 /// source: "127.0.0.1:8081".parse().unwrap(),
957 /// interface: "127.0.0.1:3478".parse().unwrap(),
958 /// };
959 ///
960 /// let digest = Password::Md5([
961 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
962 /// 239,
963 /// ]);
964 ///
965 /// let sessions = SessionManager::new(SessionManagerOptions {
966 /// port_range: (49152..65535).into(),
967 /// handler: ServiceHandlerTest,
968 /// });
969 ///
970 /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
971 /// sessions.get_password(&peer_addr, "test", PasswordAlgorithm::Md5).block_on();
972 ///
973 /// let port = sessions.allocate(&addr, None).unwrap();
974 /// let peer_port = sessions.allocate(&peer_addr, None).unwrap();
975 ///
976 /// assert!(sessions.create_permission(&addr, &endpoint, &[peer_port]));
977 /// assert!(sessions.create_permission(&peer_addr, &endpoint, &[port]));
978 ///
979 /// assert_eq!(
980 /// sessions
981 /// .get_relay_address(&addr, peer_port)
982 /// .unwrap()
983 /// .endpoint,
984 /// endpoint
985 /// );
986 ///
987 /// assert_eq!(
988 /// sessions
989 /// .get_relay_address(&peer_addr, port)
990 /// .unwrap()
991 /// .endpoint,
992 /// endpoint
993 /// );
994 /// ```
995 pub fn get_relay_address(&self, addr: &Identifier, port: u16) -> Option<Endpoint> {
996 self.port_relay_table.read().get(addr)?.get(&port).copied()
997 }
998
999 /// Refresh the session for addr.
1000 ///
1001 /// # Test
1002 ///
1003 /// ```
1004 /// use turn_server::service::session::*;
1005 /// use turn_server::service::*;
1006 /// use turn_server::codec::message::attributes::PasswordAlgorithm;
1007 /// use turn_server::codec::crypto::Password;
1008 /// use pollster::FutureExt;
1009 ///
1010 /// #[derive(Clone)]
1011 /// struct ServiceHandlerTest;
1012 ///
1013 /// impl ServiceHandler for ServiceHandlerTest {
1014 /// async fn get_password(&self, username: &str, algorithm: PasswordAlgorithm) -> Option<Password> {
1015 /// if username == "test" {
1016 /// Some(turn_server::codec::crypto::generate_password(username, "test", "test", algorithm))
1017 /// } else {
1018 /// None
1019 /// }
1020 /// }
1021 /// }
1022 ///
1023 /// let addr = Identifier {
1024 /// source: "127.0.0.1:8080".parse().unwrap(),
1025 /// interface: "127.0.0.1:3478".parse().unwrap(),
1026 /// };
1027 ///
1028 /// let digest = Password::Md5([
1029 /// 174, 238, 187, 253, 117, 209, 73, 157, 36, 56, 143, 91, 155, 16, 224,
1030 /// 239,
1031 /// ]);
1032 ///
1033 /// let sessions = SessionManager::new(SessionManagerOptions {
1034 /// port_range: (49152..65535).into(),
1035 /// handler: ServiceHandlerTest,
1036 /// });
1037 ///
1038 /// // get_session always creates a new session if it doesn't exist
1039 /// {
1040 /// let lock = sessions.get_session_or_default(&addr);
1041 /// let session = lock.get_ref().unwrap();
1042 /// match session {
1043 /// Session::New { .. } => {},
1044 /// _ => panic!("Expected new session"),
1045 /// }
1046 /// }
1047 ///
1048 /// sessions.get_password(&addr, "test", PasswordAlgorithm::Md5).block_on();
1049 ///
1050 /// {
1051 /// let lock = sessions.get_session(&addr);
1052 /// let expires = match lock.get_ref().unwrap() {
1053 /// Session::Authenticated { expires, .. } => *expires,
1054 /// _ => panic!("Expected authenticated session"),
1055 /// };
1056 ///
1057 /// assert!(expires == 600 || expires == 601 || expires == 602);
1058 /// }
1059 ///
1060 /// assert!(sessions.refresh(&addr, 0));
1061 ///
1062 /// // After refresh with lifetime 0, session should be removed
1063 /// {
1064 /// assert!(sessions.get_session(&addr).get_ref().is_none());
1065 /// }
1066 /// ```
1067 pub fn refresh(&self, addr: &Identifier, lifetime: u32) -> bool {
1068 if lifetime > 3600 {
1069 return false;
1070 }
1071
1072 if lifetime == 0 {
1073 self.remove_session(&[*addr]);
1074 } else if let Some(Session::Authenticated { expires, .. }) =
1075 self.sessions.write().get_mut(addr)
1076 {
1077 *expires = self.timer.get() + lifetime as u64;
1078 } else {
1079 return false;
1080 }
1081
1082 true
1083 }
1084}
1085
1086/// Generate a cryptographically random nonce for STUN/TURN authentication.
1087///
1088/// The nonce is a critical security component in STUN/TURN's long-term credential
1089/// mechanism (RFC 5389). It serves multiple purposes:
1090/// - Prevents replay attacks by ensuring each authentication is unique
1091/// - Acts as a server-issued challenge in the digest authentication flow
1092/// - Binds authentication attempts to specific sessions
1093///
1094/// This implementation generates a 16-character alphanumeric string using a
1095/// cryptographically secure random number generator. The length is chosen to
1096/// provide sufficient entropy (approximately 95 bits) to make brute-force
1097/// attacks computationally infeasible while remaining well under the RFC's
1098/// 128-character limit.
1099///
1100/// # Returns
1101/// A random 16-character string containing alphanumeric characters [a-zA-Z0-9].
1102///
1103/// # Security
1104/// Uses `rand::rng()` which provides cryptographic-quality randomness suitable
1105/// for security-sensitive operations. See RFC 7616 Section 5.4 for additional
1106/// guidance on nonce value selection.
1107fn make_nonce() -> String {
1108 rand::rng()
1109 .sample_iter(&Alphanumeric)
1110 .take(16)
1111 .map(char::from)
1112 .collect()
1113}