clia_turn/allocation/
allocation_manager.rs1#[cfg(test)]
2mod allocation_manager_test;
3
4use std::collections::HashMap;
5
6use futures::future;
7use stun::textattrs::Username;
8use tokio::sync::mpsc;
9use util::Conn;
10
11use super::*;
12use crate::error::*;
13use crate::relay::*;
14
15pub struct ManagerConfig {
17 pub relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
18 pub alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
19}
20
21pub struct Manager {
23 allocations: Arc<Mutex<AllocationMap>>,
24 reservations: Arc<Mutex<HashMap<String, u16>>>,
25 relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
26 alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
27}
28
29impl Manager {
30 pub fn new(config: ManagerConfig) -> Self {
32 Manager {
33 allocations: Arc::new(Mutex::new(HashMap::new())),
34 reservations: Arc::new(Mutex::new(HashMap::new())),
35 relay_addr_generator: config.relay_addr_generator,
36 alloc_close_notify: config.alloc_close_notify,
37 }
38 }
39
40 pub async fn close(&self) -> Result<()> {
42 let allocations = self.allocations.lock().await;
43 for a in allocations.values() {
44 a.close().await?;
45 }
46 Ok(())
47 }
48
49 pub async fn get_allocations_info(
52 &self,
53 five_tuples: Option<Vec<FiveTuple>>,
54 ) -> HashMap<FiveTuple, AllocationInfo> {
55 let mut infos = HashMap::new();
56
57 let guarded = self.allocations.lock().await;
58
59 guarded.iter().for_each(|(five_tuple, alloc)| {
60 if five_tuples.is_none() || five_tuples.as_ref().unwrap().contains(five_tuple) {
61 infos.insert(
62 *five_tuple,
63 AllocationInfo::new(
64 *five_tuple,
65 alloc.username.text.clone(),
66 alloc.relay_addr,
67 #[cfg(feature = "metrics")]
68 alloc.relayed_bytes.load(Ordering::Acquire),
69 ),
70 );
71 }
72 });
73
74 infos
75 }
76
77 pub async fn get_allocation(&self, five_tuple: &FiveTuple) -> Option<Arc<Allocation>> {
79 let allocations = self.allocations.lock().await;
80 allocations.get(five_tuple).cloned()
81 }
82
83 pub async fn create_allocation(
85 &self,
86 five_tuple: FiveTuple,
87 turn_socket: Arc<dyn Conn + Send + Sync>,
88 requested_port: u16,
89 lifetime: Duration,
90 username: Username,
91 use_ipv4: bool,
92 ) -> Result<Arc<Allocation>> {
93 if lifetime == Duration::from_secs(0) {
94 return Err(Error::ErrLifetimeZero);
95 }
96
97 if self.get_allocation(&five_tuple).await.is_some() {
98 return Err(Error::ErrDupeFiveTuple);
99 }
100
101 let (relay_socket, relay_addr) = self
102 .relay_addr_generator
103 .allocate_conn(use_ipv4, requested_port)
104 .await?;
105 let mut a = Allocation::new(
106 turn_socket,
107 relay_socket,
108 relay_addr,
109 five_tuple,
110 username,
111 Arc::downgrade(&self.allocations),
112 self.alloc_close_notify.clone(),
113 );
114
115 log::debug!("listening on relay addr: {:?}", a.relay_addr);
116 a.start(lifetime).await;
117 a.packet_handler().await;
118
119 let a = Arc::new(a);
120 {
121 let mut allocations = self.allocations.lock().await;
122 allocations.insert(five_tuple, Arc::clone(&a));
123 }
124
125 Ok(a)
126 }
127
128 pub async fn delete_allocation(&self, five_tuple: &FiveTuple) {
130 let allocation = self.allocations.lock().await.remove(five_tuple);
131
132 if let Some(a) = allocation {
133 if let Err(err) = a.close().await {
134 log::error!("Failed to close allocation: {}", err);
135 }
136 }
137 }
138
139 pub async fn delete_allocations_by_username(&self, name: &str) {
141 let to_delete = {
142 let mut allocations = self.allocations.lock().await;
143
144 let mut to_delete = Vec::new();
145
146 allocations.retain(|_, allocation| {
148 let match_name = allocation.username.text == name;
149
150 if match_name {
151 to_delete.push(Arc::clone(allocation));
152 }
153
154 !match_name
155 });
156
157 to_delete
158 };
159
160 future::join_all(to_delete.iter().map(|a| async move {
161 if let Err(err) = a.close().await {
162 log::error!("Failed to close allocation: {}", err);
163 }
164 }))
165 .await;
166 }
167
168 pub async fn create_reservation(&self, reservation_token: String, port: u16) {
170 let reservations = Arc::clone(&self.reservations);
171 let reservation_token2 = reservation_token.clone();
172
173 tokio::spawn(async move {
174 let sleep = tokio::time::sleep(Duration::from_secs(30));
175 tokio::pin!(sleep);
176 tokio::select! {
177 _ = &mut sleep => {
178 let mut reservations = reservations.lock().await;
179 reservations.remove(&reservation_token2);
180 },
181 }
182 });
183
184 let mut reservations = self.reservations.lock().await;
185 reservations.insert(reservation_token, port);
186 }
187
188 pub async fn get_reservation(&self, reservation_token: &str) -> Option<u16> {
190 let reservations = self.reservations.lock().await;
191 reservations.get(reservation_token).copied()
192 }
193
194 pub async fn get_random_even_port(&self) -> Result<u16> {
196 let (_, addr) = self.relay_addr_generator.allocate_conn(true, 0).await?;
197 Ok(addr.port())
198 }
199}