turn_server/router.rs
1use std::{net::SocketAddr, sync::Arc};
2
3use ahash::AHashMap;
4use parking_lot::RwLock;
5use tokio::sync::mpsc::*;
6use turn::ResponseMethod;
7
8type Receiver = UnboundedSender<(Vec<u8>, ResponseMethod, SocketAddr)>;
9
10/// Handles packet forwarding between transport protocols.
11#[derive(Clone)]
12pub struct Router(Arc<RwLock<AHashMap<SocketAddr, Receiver>>>);
13
14impl Default for Router {
15 fn default() -> Self {
16 Self(Arc::new(RwLock::new(AHashMap::with_capacity(1024))))
17 }
18}
19
20impl Router {
21 /// Get the socket reader for the route.
22 ///
23 /// Each transport protocol is layered according to its own socket, and
24 /// the data forwarded to this socket can be obtained by routing.
25 ///
26 /// # Example
27 ///
28 /// ```
29 /// use std::net::SocketAddr;
30 /// use turn::ResponseMethod;
31 /// use turn_server::router::*;
32 ///
33 /// #[tokio::main]
34 /// async fn main() {
35 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
36 /// let router = Router::default();
37 /// let mut receiver = router.get_receiver(addr);
38 ///
39 /// router.send(&addr, ResponseMethod::ChannelData, &addr, &[1, 2, 3]);
40 /// let ret = receiver.recv().await.unwrap();
41 /// assert_eq!(ret.0, vec![1, 2, 3]);
42 /// assert_eq!(ret.1, ResponseMethod::ChannelData);
43 /// assert_eq!(ret.2, addr);
44 /// }
45 /// ```
46 pub fn get_receiver(&self, interface: SocketAddr) -> UnboundedReceiver<(Vec<u8>, ResponseMethod, SocketAddr)> {
47 let (sender, receiver) = unbounded_channel();
48 self.0.write().insert(interface, sender);
49 receiver
50 }
51
52 /// Send data to router.
53 ///
54 /// By specifying the socket identifier and destination address, the route
55 /// is forwarded to the corresponding socket. However, it should be noted
56 /// that calling this function will not notify whether the socket exists.
57 /// If it does not exist, the data will be discarded by default.
58 ///
59 /// # Example
60 ///
61 /// ```
62 /// use std::net::SocketAddr;
63 /// use turn::ResponseMethod;
64 /// use turn_server::router::*;
65 ///
66 /// #[tokio::main]
67 /// async fn main() {
68 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
69 /// let router = Router::default();
70 /// let mut receiver = router.get_receiver(addr);
71 ///
72 /// router.send(&addr, ResponseMethod::ChannelData, &addr, &[1, 2, 3]);
73 /// let ret = receiver.recv().await.unwrap();
74 /// assert_eq!(ret.0, vec![1, 2, 3]);
75 /// assert_eq!(ret.1, ResponseMethod::ChannelData);
76 /// assert_eq!(ret.2, addr);
77 /// }
78 /// ```
79 pub fn send(&self, interface: &SocketAddr, method: ResponseMethod, addr: &SocketAddr, data: &[u8]) {
80 let mut is_destroy = false;
81
82 {
83 if let Some(sender) = self.0.read().get(interface) {
84 if sender.send((data.to_vec(), method, *addr)).is_err() {
85 is_destroy = true;
86 }
87 }
88 }
89
90 if is_destroy {
91 self.remove(interface);
92 }
93 }
94
95 /// delete socket.
96 ///
97 /// # Example
98 ///
99 /// ```
100 /// use std::net::SocketAddr;
101 /// use turn::ResponseMethod;
102 /// use turn_server::router::*;
103 ///
104 /// #[tokio::main]
105 /// async fn main() {
106 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
107 /// let router = Router::default();
108 /// let mut receiver = router.get_receiver(addr);
109 ///
110 /// router.send(&addr, ResponseMethod::ChannelData, &addr, &[1, 2, 3]);
111 /// let ret = receiver.recv().await.unwrap();
112 /// assert_eq!(ret.0, vec![1, 2, 3]);
113 /// assert_eq!(ret.1, ResponseMethod::ChannelData);
114 /// assert_eq!(ret.2, addr);
115 ///
116 /// router.remove(&addr);
117 /// assert!(receiver.recv().await.is_none());
118 /// }
119 /// ```
120 pub fn remove(&self, interface: &SocketAddr) {
121 drop(self.0.write().remove(interface))
122 }
123}