Skip to main content

clia_turn/allocation/
allocation_manager.rs

1#[cfg(test)]
2mod allocation_manager_test;
3
4use std::collections::HashMap;
5
6use futures::future;
7use stun::textattrs::Username;
8use tokio::sync::mpsc;
9use util::Conn;
10
11use super::*;
12use crate::error::*;
13use crate::relay::*;
14
15/// `ManagerConfig` a bag of config params for `Manager`.
16pub struct ManagerConfig {
17    pub relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
18    pub alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
19}
20
21/// `Manager` is used to hold active allocations.
22pub struct Manager {
23    allocations: Arc<Mutex<AllocationMap>>,
24    reservations: Arc<Mutex<HashMap<String, u16>>>,
25    relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
26    alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
27}
28
29impl Manager {
30    /// Creates a new [`Manager`].
31    pub fn new(config: ManagerConfig) -> Self {
32        Manager {
33            allocations: Arc::new(Mutex::new(HashMap::new())),
34            reservations: Arc::new(Mutex::new(HashMap::new())),
35            relay_addr_generator: config.relay_addr_generator,
36            alloc_close_notify: config.alloc_close_notify,
37        }
38    }
39
40    /// Closes this [`manager`] and closes all [`Allocation`]s it manages.
41    pub async fn close(&self) -> Result<()> {
42        let allocations = self.allocations.lock().await;
43        for a in allocations.values() {
44            a.close().await?;
45        }
46        Ok(())
47    }
48
49    /// Returns the information about the all [`Allocation`]s associated with
50    /// the specified [`FiveTuple`]s.
51    pub async fn get_allocations_info(
52        &self,
53        five_tuples: Option<Vec<FiveTuple>>,
54    ) -> HashMap<FiveTuple, AllocationInfo> {
55        let mut infos = HashMap::new();
56
57        let guarded = self.allocations.lock().await;
58
59        guarded.iter().for_each(|(five_tuple, alloc)| {
60            if five_tuples.is_none() || five_tuples.as_ref().unwrap().contains(five_tuple) {
61                infos.insert(
62                    *five_tuple,
63                    AllocationInfo::new(
64                        *five_tuple,
65                        alloc.username.text.clone(),
66                        alloc.relay_addr,
67                        #[cfg(feature = "metrics")]
68                        alloc.relayed_bytes.load(Ordering::Acquire),
69                    ),
70                );
71            }
72        });
73
74        infos
75    }
76
77    /// Fetches the [`Allocation`] matching the passed [`FiveTuple`].
78    pub async fn get_allocation(&self, five_tuple: &FiveTuple) -> Option<Arc<Allocation>> {
79        let allocations = self.allocations.lock().await;
80        allocations.get(five_tuple).cloned()
81    }
82
83    /// Creates a new [`Allocation`] and starts relaying.
84    pub async fn create_allocation(
85        &self,
86        five_tuple: FiveTuple,
87        turn_socket: Arc<dyn Conn + Send + Sync>,
88        requested_port: u16,
89        lifetime: Duration,
90        username: Username,
91        use_ipv4: bool,
92    ) -> Result<Arc<Allocation>> {
93        if lifetime == Duration::from_secs(0) {
94            return Err(Error::ErrLifetimeZero);
95        }
96
97        if self.get_allocation(&five_tuple).await.is_some() {
98            return Err(Error::ErrDupeFiveTuple);
99        }
100
101        let (relay_socket, relay_addr) = self
102            .relay_addr_generator
103            .allocate_conn(use_ipv4, requested_port)
104            .await?;
105        let mut a = Allocation::new(
106            turn_socket,
107            relay_socket,
108            relay_addr,
109            five_tuple,
110            username,
111            Arc::downgrade(&self.allocations),
112            self.alloc_close_notify.clone(),
113        );
114
115        log::debug!("listening on relay addr: {:?}", a.relay_addr);
116        a.start(lifetime).await;
117        a.packet_handler().await;
118
119        let a = Arc::new(a);
120        {
121            let mut allocations = self.allocations.lock().await;
122            allocations.insert(five_tuple, Arc::clone(&a));
123        }
124
125        Ok(a)
126    }
127
128    /// Removes an [`Allocation`].
129    pub async fn delete_allocation(&self, five_tuple: &FiveTuple) {
130        let allocation = self.allocations.lock().await.remove(five_tuple);
131
132        if let Some(a) = allocation {
133            if let Err(err) = a.close().await {
134                log::error!("Failed to close allocation: {}", err);
135            }
136        }
137    }
138
139    /// Deletes the [`Allocation`]s according to the specified username `name`.
140    pub async fn delete_allocations_by_username(&self, name: &str) {
141        let to_delete = {
142            let mut allocations = self.allocations.lock().await;
143
144            let mut to_delete = Vec::new();
145
146            // TODO(logist322): Use `.drain_filter()` once stabilized.
147            allocations.retain(|_, allocation| {
148                let match_name = allocation.username.text == name;
149
150                if match_name {
151                    to_delete.push(Arc::clone(allocation));
152                }
153
154                !match_name
155            });
156
157            to_delete
158        };
159
160        future::join_all(to_delete.iter().map(|a| async move {
161            if let Err(err) = a.close().await {
162                log::error!("Failed to close allocation: {}", err);
163            }
164        }))
165        .await;
166    }
167
168    /// Stores the reservation for the token+port.
169    pub async fn create_reservation(&self, reservation_token: String, port: u16) {
170        let reservations = Arc::clone(&self.reservations);
171        let reservation_token2 = reservation_token.clone();
172
173        tokio::spawn(async move {
174            let sleep = tokio::time::sleep(Duration::from_secs(30));
175            tokio::pin!(sleep);
176            tokio::select! {
177                _ = &mut sleep => {
178                    let mut reservations = reservations.lock().await;
179                    reservations.remove(&reservation_token2);
180                },
181            }
182        });
183
184        let mut reservations = self.reservations.lock().await;
185        reservations.insert(reservation_token, port);
186    }
187
188    /// Returns the port for a given reservation if it exists.
189    pub async fn get_reservation(&self, reservation_token: &str) -> Option<u16> {
190        let reservations = self.reservations.lock().await;
191        reservations.get(reservation_token).copied()
192    }
193
194    /// Returns a random un-allocated udp4 port.
195    pub async fn get_random_even_port(&self) -> Result<u16> {
196        let (_, addr) = self.relay_addr_generator.allocate_conn(true, 0).await?;
197        Ok(addr.port())
198    }
199}