1#[cfg(test)]
2mod allocation_test;
3
4pub mod allocation_manager;
5pub mod channel_bind;
6pub mod five_tuple;
7pub mod permission;
8
9use crate::errors::*;
10use crate::proto::{chandata::*, channum::*, data::*, peeraddr::*, *};
11use channel_bind::*;
12use five_tuple::*;
13use permission::*;
14
15use stun::agent::*;
16use stun::message::*;
17
18use util::{Conn, Error};
19
20use tokio::sync::{mpsc, Mutex};
21use tokio::time::{Duration, Instant};
22
23use std::collections::HashMap;
24use std::marker::{Send, Sync};
25use std::net::SocketAddr;
26use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
27
28const RTP_MTU: usize = 1500;
29
30pub type AllocationMap = Arc<Mutex<HashMap<String, Arc<Mutex<Allocation>>>>>;
31
32pub struct Allocation {
35 protocol: Protocol,
36 turn_socket: Arc<dyn Conn + Send + Sync>,
37 pub(crate) relay_addr: SocketAddr,
38 pub(crate) relay_socket: Arc<dyn Conn + Send + Sync>,
39 five_tuple: FiveTuple,
40 permissions: Arc<Mutex<HashMap<String, Permission>>>,
41 channel_bindings: Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>,
42 pub(crate) allocations: Option<AllocationMap>,
43 reset_tx: Option<mpsc::Sender<Duration>>,
44 timer_expired: Arc<AtomicBool>,
45 closed: bool, }
47
48fn addr2ipfingerprint(addr: &SocketAddr) -> String {
49 addr.ip().to_string()
50}
51
52impl Allocation {
53 pub fn new(
55 turn_socket: Arc<dyn Conn + Send + Sync>,
56 relay_socket: Arc<dyn Conn + Send + Sync>,
57 relay_addr: SocketAddr,
58 five_tuple: FiveTuple,
59 ) -> Self {
60 Allocation {
61 protocol: PROTO_UDP,
62 turn_socket,
63 relay_addr,
64 relay_socket,
65 five_tuple,
66 permissions: Arc::new(Mutex::new(HashMap::new())),
67 channel_bindings: Arc::new(Mutex::new(HashMap::new())),
68 allocations: None,
69 reset_tx: None,
70 timer_expired: Arc::new(AtomicBool::new(false)),
71 closed: false,
72 }
73 }
74
75 pub async fn has_permission(&self, addr: &SocketAddr) -> bool {
77 let permissions = self.permissions.lock().await;
78 permissions.get(&addr2ipfingerprint(addr)).is_some()
79 }
80
81 pub async fn add_permission(&self, mut p: Permission) {
83 let fingerprint = addr2ipfingerprint(&p.addr);
84
85 {
86 let permissions = self.permissions.lock().await;
87 if let Some(existed_permission) = permissions.get(&fingerprint) {
88 existed_permission.refresh(PERMISSION_TIMEOUT).await;
89 return;
90 }
91 }
92
93 p.permissions = Some(Arc::clone(&self.permissions));
94 p.start(PERMISSION_TIMEOUT).await;
95
96 {
97 let mut permissions = self.permissions.lock().await;
98 permissions.insert(fingerprint, p);
99 }
100 }
101
102 pub async fn remove_permission(&self, addr: &SocketAddr) -> bool {
104 let mut permissions = self.permissions.lock().await;
105 permissions.remove(&addr2ipfingerprint(addr)).is_some()
106 }
107
108 pub async fn add_channel_bind(
111 &self,
112 mut c: ChannelBind,
113 lifetime: Duration,
114 ) -> Result<(), Error> {
115 {
116 if let Some(addr) = self.get_channel_addr(&c.number).await {
117 if addr != c.peer {
118 return Err(ERR_SAME_CHANNEL_DIFFERENT_PEER.to_owned());
119 }
120 }
121
122 if let Some(number) = self.get_channel_number(&c.peer).await {
123 if number != c.number {
124 return Err(ERR_SAME_CHANNEL_DIFFERENT_PEER.to_owned());
125 }
126 }
127 }
128
129 {
130 let channel_bindings = self.channel_bindings.lock().await;
131 if let Some(cb) = channel_bindings.get(&c.number) {
132 cb.refresh(lifetime).await;
133
134 self.add_permission(Permission::new(cb.peer)).await;
136
137 return Ok(());
138 }
139 }
140
141 let peer = c.peer;
142
143 c.channel_bindings = Some(Arc::clone(&self.channel_bindings));
145 c.start(lifetime).await;
146
147 {
148 let mut channel_bindings = self.channel_bindings.lock().await;
149 channel_bindings.insert(c.number, c);
150 }
151
152 self.add_permission(Permission::new(peer)).await;
154
155 Ok(())
156 }
157
158 pub async fn remove_channel_bind(&self, number: ChannelNumber) -> bool {
160 let mut channel_bindings = self.channel_bindings.lock().await;
161 channel_bindings.remove(&number).is_some()
162 }
163
164 pub async fn get_channel_addr(&self, number: &ChannelNumber) -> Option<SocketAddr> {
166 let channel_bindings = self.channel_bindings.lock().await;
167 if let Some(cb) = channel_bindings.get(number) {
168 Some(cb.peer)
169 } else {
170 None
171 }
172 }
173
174 pub async fn get_channel_number(&self, addr: &SocketAddr) -> Option<ChannelNumber> {
176 let channel_bindings = self.channel_bindings.lock().await;
177 for cb in channel_bindings.values() {
178 if cb.peer == *addr {
179 return Some(cb.number);
180 }
181 }
182 None
183 }
184
185 pub async fn close(&mut self) -> Result<(), Error> {
187 if self.closed {
188 return Err(ERR_CLOSED.to_owned());
189 }
190
191 self.closed = true;
192 self.stop();
193
194 {
195 let mut permissions = self.permissions.lock().await;
196 for p in permissions.values_mut() {
197 p.stop();
198 }
199 }
200
201 {
202 let mut channel_bindings = self.channel_bindings.lock().await;
203 for c in channel_bindings.values_mut() {
204 c.stop();
205 }
206 }
207
208 log::trace!("allocation with {} closed!", self.five_tuple);
209
210 Ok(())
211 }
212
213 pub async fn start(&mut self, lifetime: Duration) {
214 let (reset_tx, mut reset_rx) = mpsc::channel(1);
215 self.reset_tx = Some(reset_tx);
216
217 let allocations = self.allocations.clone();
218 let five_tuple = self.five_tuple.clone();
219 let timer_expired = Arc::clone(&self.timer_expired);
220
221 tokio::spawn(async move {
222 let timer = tokio::time::sleep(lifetime);
223 tokio::pin!(timer);
224 let mut done = false;
225
226 while !done {
227 tokio::select! {
228 _ = &mut timer => {
229 if let Some(allocs) = &allocations{
230 let mut alls = allocs.lock().await;
231 if let Some(a) = alls.remove(&five_tuple.fingerprint()) {
232 let mut a = a.lock().await;
233 let _ = a.close().await;
234 }
235 }
236 done = true;
237 },
238 result = reset_rx.recv() => {
239 if let Some(d) = result {
240 timer.as_mut().reset(Instant::now() + d);
241 } else {
242 done = true;
243 }
244 },
245 }
246 }
247
248 timer_expired.store(true, Ordering::SeqCst);
249 });
250 }
251
252 pub fn stop(&mut self) -> bool {
253 let expired = self.reset_tx.is_none() || self.timer_expired.load(Ordering::SeqCst);
254 self.reset_tx.take();
255 expired
256 }
257
258 pub async fn refresh(&self, lifetime: Duration) {
260 if let Some(tx) = &self.reset_tx {
261 let _ = tx.send(lifetime).await;
262 }
263 }
264
265 async fn packet_handler(&self) {
285 let five_tuple = self.five_tuple.clone();
286 let relay_addr = self.relay_addr;
287 let relay_socket = Arc::clone(&self.relay_socket);
288 let turn_socket = Arc::clone(&self.turn_socket);
289 let allocations = self.allocations.clone();
290 let channel_bindings = Arc::clone(&self.channel_bindings);
291 let permissions = Arc::clone(&self.permissions);
292
293 tokio::spawn(async move {
294 let mut buffer = vec![0u8; RTP_MTU];
295
296 loop {
297 let (n, src_addr) = match relay_socket.recv_from(&mut buffer).await {
298 Ok((n, src_addr)) => (n, src_addr),
299 Err(_) => {
300 if let Some(allocs) = &allocations {
301 let mut alls = allocs.lock().await;
302 alls.remove(&five_tuple.fingerprint());
303 }
304 break;
305 }
306 };
307
308 log::debug!(
309 "relay socket {:?} received {} bytes from {}",
310 relay_socket.local_addr().await,
311 n,
312 src_addr
313 );
314
315 let cb_number = {
316 let mut cb_number = None;
317 let cbs = channel_bindings.lock().await;
318 for cb in cbs.values() {
319 if cb.peer == src_addr {
320 cb_number = Some(cb.number);
321 break;
322 }
323 }
324 cb_number
325 };
326
327 if let Some(number) = cb_number {
328 let mut channel_data = ChannelData {
329 data: buffer[..n].to_vec(),
330 number,
331 raw: vec![],
332 };
333 channel_data.encode();
334
335 if let Err(err) = turn_socket
336 .send_to(&channel_data.raw, five_tuple.src_addr)
337 .await
338 {
339 log::error!(
340 "Failed to send ChannelData from allocation {} {}",
341 src_addr,
342 err
343 );
344 }
345 } else {
346 let exist = {
347 let ps = permissions.lock().await;
348 ps.get(&addr2ipfingerprint(&src_addr)).is_some()
349 };
350
351 if exist {
352 let msg = {
353 let peer_address_attr = PeerAddress {
354 ip: src_addr.ip(),
355 port: src_addr.port(),
356 };
357 let data_attr = Data(buffer[..n].to_vec());
358
359 let mut msg = Message::new();
360 if let Err(err) = msg.build(&[
361 Box::new(TransactionId::new()),
362 Box::new(MessageType::new(METHOD_DATA, CLASS_INDICATION)),
363 Box::new(peer_address_attr),
364 Box::new(data_attr),
365 ]) {
366 log::error!(
367 "Failed to send DataIndication from allocation {} {}",
368 src_addr,
369 err
370 );
371 None
372 } else {
373 Some(msg)
374 }
375 };
376
377 if let Some(msg) = msg {
378 log::debug!(
379 "relaying message from {} to client at {}",
380 src_addr,
381 five_tuple.src_addr
382 );
383 if let Err(err) =
384 turn_socket.send_to(&msg.raw, five_tuple.src_addr).await
385 {
386 log::error!(
387 "Failed to send DataIndication from allocation {} {}",
388 src_addr,
389 err
390 );
391 }
392 }
393 } else {
394 log::info!(
395 "No Permission or Channel exists for {} on allocation {}",
396 src_addr,
397 relay_addr
398 );
399 }
400 }
401 }
402 });
403 }
404}