webrtc_turn/allocation/
allocation_manager.rs1#[cfg(test)]
2mod allocation_manager_test;
3
4use super::*;
5use crate::errors::*;
6use crate::relay::*;
7
8use std::collections::HashMap;
9
10use util::{Conn, Error};
11
12pub struct ManagerConfig {
14 pub relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
15}
16
17pub struct Manager {
19 allocations: AllocationMap,
20 reservations: Arc<Mutex<HashMap<String, u16>>>,
21 relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
22}
23
24impl Manager {
25 pub fn new(config: ManagerConfig) -> Self {
27 Manager {
28 allocations: Arc::new(Mutex::new(HashMap::new())),
29 reservations: Arc::new(Mutex::new(HashMap::new())),
30 relay_addr_generator: config.relay_addr_generator,
31 }
32 }
33
34 pub async fn close(&self) -> Result<(), Error> {
36 let allocations = self.allocations.lock().await;
37 for a in allocations.values() {
38 let mut a = a.lock().await;
39 a.close().await?;
40 }
41 Ok(())
42 }
43
44 pub async fn get_allocation(&self, five_tuple: &FiveTuple) -> Option<Arc<Mutex<Allocation>>> {
46 let allocations = self.allocations.lock().await;
47 if let Some(a) = allocations.get(&five_tuple.fingerprint()) {
48 Some(Arc::clone(a))
49 } else {
50 None
51 }
52 }
53
54 pub async fn create_allocation(
56 &self,
57 five_tuple: FiveTuple,
58 turn_socket: Arc<dyn Conn + Send + Sync>,
59 requested_port: u16,
60 lifetime: Duration,
61 ) -> Result<Arc<Mutex<Allocation>>, Error> {
62 if lifetime == Duration::from_secs(0) {
63 return Err(ERR_LIFETIME_ZERO.to_owned());
64 }
65
66 if self.get_allocation(&five_tuple).await.is_some() {
67 return Err(ERR_DUPE_FIVE_TUPLE.to_owned());
68 }
69
70 let (relay_socket, relay_addr) = self
71 .relay_addr_generator
72 .allocate_conn(true, requested_port)
73 .await?;
74 let mut a = Allocation::new(turn_socket, relay_socket, relay_addr, five_tuple.clone());
75 a.allocations = Some(Arc::clone(&self.allocations));
76
77 log::debug!("listening on relay addr: {:?}", a.relay_addr);
78 a.start(lifetime).await;
79 a.packet_handler().await;
80
81 let a = Arc::new(Mutex::new(a));
82 {
83 let mut allocations = self.allocations.lock().await;
84 allocations.insert(five_tuple.fingerprint(), Arc::clone(&a));
85 }
86
87 Ok(a)
88 }
89
90 pub async fn delete_allocation(&self, five_tuple: &FiveTuple) {
92 let fingerprint = five_tuple.fingerprint();
93
94 let mut allocations = self.allocations.lock().await;
95 let allocation = allocations.remove(&fingerprint);
96 if let Some(a) = allocation {
97 let mut a = a.lock().await;
98 if let Err(err) = a.close().await {
99 log::error!("Failed to close allocation: {}", err);
100 }
101 }
102 }
103
104 pub async fn create_reservation(&self, reservation_token: String, port: u16) {
106 let reservations = Arc::clone(&self.reservations);
107 let reservation_token2 = reservation_token.clone();
108
109 tokio::spawn(async move {
110 let sleep = tokio::time::sleep(Duration::from_secs(30));
111 tokio::pin!(sleep);
112 tokio::select! {
113 _ = &mut sleep => {
114 let mut reservations = reservations.lock().await;
115 reservations.remove(&reservation_token2);
116 },
117 }
118 });
119
120 let mut reservations = self.reservations.lock().await;
121 reservations.insert(reservation_token, port);
122 }
123
124 pub async fn get_reservation(&self, reservation_token: &str) -> Option<u16> {
126 let reservations = self.reservations.lock().await;
127 if let Some(port) = reservations.get(reservation_token) {
128 Some(*port)
129 } else {
130 None
131 }
132 }
133
134 pub async fn get_random_even_port(&self) -> Result<u16, Error> {
136 let (_, addr) = self.relay_addr_generator.allocate_conn(true, 0).await?;
137 Ok(addr.port())
138 }
139}