webrtc_turn/allocation/
allocation_manager.rs

1#[cfg(test)]
2mod allocation_manager_test;
3
4use super::*;
5use crate::errors::*;
6use crate::relay::*;
7
8use std::collections::HashMap;
9
10use util::{Conn, Error};
11
12// ManagerConfig a bag of config params for Manager.
13pub struct ManagerConfig {
14    pub relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
15}
16
17// Manager is used to hold active allocations
18pub struct Manager {
19    allocations: AllocationMap,
20    reservations: Arc<Mutex<HashMap<String, u16>>>,
21    relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
22}
23
24impl Manager {
25    // creates a new instance of Manager.
26    pub fn new(config: ManagerConfig) -> Self {
27        Manager {
28            allocations: Arc::new(Mutex::new(HashMap::new())),
29            reservations: Arc::new(Mutex::new(HashMap::new())),
30            relay_addr_generator: config.relay_addr_generator,
31        }
32    }
33
34    // Close closes the manager and closes all allocations it manages
35    pub async fn close(&self) -> Result<(), Error> {
36        let allocations = self.allocations.lock().await;
37        for a in allocations.values() {
38            let mut a = a.lock().await;
39            a.close().await?;
40        }
41        Ok(())
42    }
43
44    // get_allocation fetches the allocation matching the passed FiveTuple
45    pub async fn get_allocation(&self, five_tuple: &FiveTuple) -> Option<Arc<Mutex<Allocation>>> {
46        let allocations = self.allocations.lock().await;
47        if let Some(a) = allocations.get(&five_tuple.fingerprint()) {
48            Some(Arc::clone(a))
49        } else {
50            None
51        }
52    }
53
54    // create_allocation creates a new allocation and starts relaying
55    pub async fn create_allocation(
56        &self,
57        five_tuple: FiveTuple,
58        turn_socket: Arc<dyn Conn + Send + Sync>,
59        requested_port: u16,
60        lifetime: Duration,
61    ) -> Result<Arc<Mutex<Allocation>>, Error> {
62        if lifetime == Duration::from_secs(0) {
63            return Err(ERR_LIFETIME_ZERO.to_owned());
64        }
65
66        if self.get_allocation(&five_tuple).await.is_some() {
67            return Err(ERR_DUPE_FIVE_TUPLE.to_owned());
68        }
69
70        let (relay_socket, relay_addr) = self
71            .relay_addr_generator
72            .allocate_conn(true, requested_port)
73            .await?;
74        let mut a = Allocation::new(turn_socket, relay_socket, relay_addr, five_tuple.clone());
75        a.allocations = Some(Arc::clone(&self.allocations));
76
77        log::debug!("listening on relay addr: {:?}", a.relay_addr);
78        a.start(lifetime).await;
79        a.packet_handler().await;
80
81        let a = Arc::new(Mutex::new(a));
82        {
83            let mut allocations = self.allocations.lock().await;
84            allocations.insert(five_tuple.fingerprint(), Arc::clone(&a));
85        }
86
87        Ok(a)
88    }
89
90    // delete_allocation removes an allocation
91    pub async fn delete_allocation(&self, five_tuple: &FiveTuple) {
92        let fingerprint = five_tuple.fingerprint();
93
94        let mut allocations = self.allocations.lock().await;
95        let allocation = allocations.remove(&fingerprint);
96        if let Some(a) = allocation {
97            let mut a = a.lock().await;
98            if let Err(err) = a.close().await {
99                log::error!("Failed to close allocation: {}", err);
100            }
101        }
102    }
103
104    // create_reservation stores the reservation for the token+port
105    pub async fn create_reservation(&self, reservation_token: String, port: u16) {
106        let reservations = Arc::clone(&self.reservations);
107        let reservation_token2 = reservation_token.clone();
108
109        tokio::spawn(async move {
110            let sleep = tokio::time::sleep(Duration::from_secs(30));
111            tokio::pin!(sleep);
112            tokio::select! {
113                _ = &mut sleep => {
114                    let mut reservations = reservations.lock().await;
115                    reservations.remove(&reservation_token2);
116                },
117            }
118        });
119
120        let mut reservations = self.reservations.lock().await;
121        reservations.insert(reservation_token, port);
122    }
123
124    // get_reservation returns the port for a given reservation if it exists
125    pub async fn get_reservation(&self, reservation_token: &str) -> Option<u16> {
126        let reservations = self.reservations.lock().await;
127        if let Some(port) = reservations.get(reservation_token) {
128            Some(*port)
129        } else {
130            None
131        }
132    }
133
134    // get_random_even_port returns a random un-allocated udp4 port
135    pub async fn get_random_even_port(&self) -> Result<u16, Error> {
136        let (_, addr) = self.relay_addr_generator.allocate_conn(true, 0).await?;
137        Ok(addr.port())
138    }
139}