turn_server/
router.rs

1use std::{net::SocketAddr, sync::Arc};
2
3use ahash::AHashMap;
4use parking_lot::RwLock;
5use tokio::sync::mpsc::*;
6use turn::ResponseMethod;
7
8type Receiver = UnboundedSender<(Vec<u8>, ResponseMethod, SocketAddr)>;
9
10/// Handles packet forwarding between transport protocols.
11#[derive(Clone)]
12pub struct Router(Arc<RwLock<AHashMap<SocketAddr, Receiver>>>);
13
14impl Default for Router {
15    fn default() -> Self {
16        Self(Arc::new(RwLock::new(AHashMap::with_capacity(1024))))
17    }
18}
19
20impl Router {
21    /// Get the socket reader for the route.
22    ///
23    /// Each transport protocol is layered according to its own socket, and
24    /// the data forwarded to this socket can be obtained by routing.
25    ///
26    /// # Example
27    ///
28    /// ```
29    /// use std::net::SocketAddr;
30    /// use turn::ResponseMethod;
31    /// use turn_server::router::*;
32    ///
33    /// #[tokio::main]
34    /// async fn main() {
35    ///     let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
36    ///     let router = Router::default();
37    ///     let mut receiver = router.get_receiver(addr);
38    ///
39    ///     router.send(&addr, ResponseMethod::ChannelData, &addr, &[1, 2, 3]);
40    ///     let ret = receiver.recv().await.unwrap();
41    ///     assert_eq!(ret.0, vec![1, 2, 3]);
42    ///     assert_eq!(ret.1, ResponseMethod::ChannelData);
43    ///     assert_eq!(ret.2, addr);
44    /// }
45    /// ```
46    pub fn get_receiver(&self, interface: SocketAddr) -> UnboundedReceiver<(Vec<u8>, ResponseMethod, SocketAddr)> {
47        let (sender, receiver) = unbounded_channel();
48        self.0.write().insert(interface, sender);
49        receiver
50    }
51
52    /// Send data to router.
53    ///
54    /// By specifying the socket identifier and destination address, the route
55    /// is forwarded to the corresponding socket. However, it should be noted
56    /// that calling this function will not notify whether the socket exists.
57    /// If it does not exist, the data will be discarded by default.
58    ///
59    /// # Example
60    ///
61    /// ```
62    /// use std::net::SocketAddr;
63    /// use turn::ResponseMethod;
64    /// use turn_server::router::*;
65    ///
66    /// #[tokio::main]
67    /// async fn main() {
68    ///     let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
69    ///     let router = Router::default();
70    ///     let mut receiver = router.get_receiver(addr);
71    ///
72    ///     router.send(&addr, ResponseMethod::ChannelData, &addr, &[1, 2, 3]);
73    ///     let ret = receiver.recv().await.unwrap();
74    ///     assert_eq!(ret.0, vec![1, 2, 3]);
75    ///     assert_eq!(ret.1, ResponseMethod::ChannelData);
76    ///     assert_eq!(ret.2, addr);
77    /// }
78    /// ```
79    pub fn send(&self, interface: &SocketAddr, method: ResponseMethod, addr: &SocketAddr, data: &[u8]) {
80        let mut is_destroy = false;
81
82        {
83            if let Some(sender) = self.0.read().get(interface) {
84                if sender.send((data.to_vec(), method, *addr)).is_err() {
85                    is_destroy = true;
86                }
87            }
88        }
89
90        if is_destroy {
91            self.remove(interface);
92        }
93    }
94
95    /// delete socket.
96    ///
97    /// # Example
98    ///
99    /// ```
100    /// use std::net::SocketAddr;
101    /// use turn::ResponseMethod;
102    /// use turn_server::router::*;
103    ///
104    /// #[tokio::main]
105    /// async fn main() {
106    ///     let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
107    ///     let router = Router::default();
108    ///     let mut receiver = router.get_receiver(addr);
109    ///
110    ///     router.send(&addr, ResponseMethod::ChannelData, &addr, &[1, 2, 3]);
111    ///     let ret = receiver.recv().await.unwrap();
112    ///     assert_eq!(ret.0, vec![1, 2, 3]);
113    ///     assert_eq!(ret.1, ResponseMethod::ChannelData);
114    ///     assert_eq!(ret.2, addr);
115    ///
116    ///     router.remove(&addr);
117    ///     assert!(receiver.recv().await.is_none());
118    /// }
119    /// ```
120    pub fn remove(&self, interface: &SocketAddr) {
121        drop(self.0.write().remove(interface))
122    }
123}