1#[cfg(test)]
2mod allocation_test;
3
4pub mod allocation_manager;
5pub mod channel_bind;
6pub mod five_tuple;
7pub mod permission;
8
9use std::collections::HashMap;
10use std::marker::{Send, Sync};
11use std::net::SocketAddr;
12use std::sync::atomic::Ordering;
13use std::sync::{Arc, Weak};
14
15use channel_bind::*;
16use five_tuple::*;
17use permission::*;
18use portable_atomic::{AtomicBool, AtomicUsize};
19use stun::agent::*;
20use stun::message::*;
21use stun::textattrs::Username;
22use tokio::sync::oneshot::{self, Sender};
23use tokio::sync::{mpsc, Mutex};
24use tokio::time::{Duration, Instant};
25use util::sync::Mutex as SyncMutex;
26use util::Conn;
27
28use crate::error::*;
29use crate::proto::chandata::*;
30use crate::proto::channum::*;
31use crate::proto::data::*;
32use crate::proto::peeraddr::*;
33use crate::proto::*;
34
35const RTP_MTU: usize = 1500;
36
37pub type AllocationMap = HashMap<FiveTuple, Arc<Allocation>>;
38
39#[derive(Debug, Clone)]
41pub struct AllocationInfo {
42 pub five_tuple: FiveTuple,
44
45 pub username: String,
47
48 pub relay_addr: SocketAddr,
50
51 #[cfg(feature = "metrics")]
53 pub relayed_bytes: usize,
54}
55
56impl AllocationInfo {
57 pub fn new(
59 five_tuple: FiveTuple,
60 username: String,
61 relay_addr: SocketAddr,
62 #[cfg(feature = "metrics")] relayed_bytes: usize,
63 ) -> Self {
64 Self {
65 five_tuple,
66 username,
67 relay_addr,
68 #[cfg(feature = "metrics")]
69 relayed_bytes,
70 }
71 }
72}
73
74pub struct Allocation {
77 protocol: Protocol,
78 turn_socket: Arc<dyn Conn + Send + Sync>,
79 pub(crate) relay_addr: SocketAddr,
80 pub(crate) relay_socket: Arc<dyn Conn + Send + Sync>,
81 five_tuple: FiveTuple,
82 username: Username,
83 permissions: Arc<Mutex<HashMap<String, Permission>>>,
84 channel_bindings: Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>,
85 allocations: Weak<Mutex<AllocationMap>>,
86 reset_tx: SyncMutex<Option<mpsc::Sender<Duration>>>,
87 timer_expired: Arc<AtomicBool>,
88 closed: AtomicBool, pub(crate) relayed_bytes: AtomicUsize,
90 drop_tx: Option<Sender<u32>>,
91 alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
92}
93
94fn addr2ipfingerprint(addr: &SocketAddr) -> String {
95 addr.ip().to_string()
96}
97
98impl Allocation {
99 pub fn new(
101 turn_socket: Arc<dyn Conn + Send + Sync>,
102 relay_socket: Arc<dyn Conn + Send + Sync>,
103 relay_addr: SocketAddr,
104 five_tuple: FiveTuple,
105 username: Username,
106 allocation_map: Weak<Mutex<AllocationMap>>,
107 alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
108 ) -> Self {
109 Allocation {
110 protocol: PROTO_UDP,
111 turn_socket,
112 relay_addr,
113 relay_socket,
114 five_tuple,
115 username,
116 permissions: Arc::new(Mutex::new(HashMap::new())),
117 channel_bindings: Arc::new(Mutex::new(HashMap::new())),
118 allocations: allocation_map,
119 reset_tx: SyncMutex::new(None),
120 timer_expired: Arc::new(AtomicBool::new(false)),
121 closed: AtomicBool::new(false),
122 relayed_bytes: Default::default(),
123 drop_tx: None,
124 alloc_close_notify,
125 }
126 }
127
128 pub async fn has_permission(&self, addr: &SocketAddr) -> bool {
130 let permissions = self.permissions.lock().await;
131 permissions.get(&addr2ipfingerprint(addr)).is_some()
132 }
133
134 pub async fn add_permission(&self, mut p: Permission) {
136 let fingerprint = addr2ipfingerprint(&p.addr);
137
138 {
139 let permissions = self.permissions.lock().await;
140 if let Some(existed_permission) = permissions.get(&fingerprint) {
141 existed_permission.refresh(PERMISSION_TIMEOUT).await;
142 return;
143 }
144 }
145
146 p.permissions = Some(Arc::downgrade(&self.permissions));
147 p.start(PERMISSION_TIMEOUT).await;
148
149 {
150 let mut permissions = self.permissions.lock().await;
151 permissions.insert(fingerprint, p);
152 }
153 }
154
155 pub async fn remove_permission(&self, addr: &SocketAddr) -> bool {
157 let mut permissions = self.permissions.lock().await;
158 permissions.remove(&addr2ipfingerprint(addr)).is_some()
159 }
160
161 pub async fn add_channel_bind(&self, mut c: ChannelBind, lifetime: Duration) -> Result<()> {
164 {
165 if let Some(addr) = self.get_channel_addr(&c.number).await {
166 if addr != c.peer {
167 return Err(Error::ErrSameChannelDifferentPeer);
168 }
169 }
170
171 if let Some(number) = self.get_channel_number(&c.peer).await {
172 if number != c.number {
173 return Err(Error::ErrSameChannelDifferentPeer);
174 }
175 }
176 }
177
178 {
179 let channel_bindings = self.channel_bindings.lock().await;
180 if let Some(cb) = channel_bindings.get(&c.number) {
181 cb.refresh(lifetime).await;
182
183 self.add_permission(Permission::new(cb.peer)).await;
185
186 return Ok(());
187 }
188 }
189
190 let peer = c.peer;
191
192 c.channel_bindings = Some(Arc::downgrade(&self.channel_bindings));
194 c.start(lifetime).await;
195
196 {
197 let mut channel_bindings = self.channel_bindings.lock().await;
198 channel_bindings.insert(c.number, c);
199 }
200
201 self.add_permission(Permission::new(peer)).await;
203
204 Ok(())
205 }
206
207 pub async fn remove_channel_bind(&self, number: ChannelNumber) -> bool {
209 let mut channel_bindings = self.channel_bindings.lock().await;
210 channel_bindings.remove(&number).is_some()
211 }
212
213 pub async fn get_channel_addr(&self, number: &ChannelNumber) -> Option<SocketAddr> {
215 let channel_bindings = self.channel_bindings.lock().await;
216 channel_bindings.get(number).map(|cb| cb.peer)
217 }
218
219 pub async fn get_channel_number(&self, addr: &SocketAddr) -> Option<ChannelNumber> {
221 let channel_bindings = self.channel_bindings.lock().await;
222 for cb in channel_bindings.values() {
223 if cb.peer == *addr {
224 return Some(cb.number);
225 }
226 }
227 None
228 }
229
230 pub async fn close(&self) -> Result<()> {
232 if self.closed.load(Ordering::Acquire) {
233 return Err(Error::ErrClosed);
234 }
235
236 self.closed.store(true, Ordering::Release);
237 self.stop();
238
239 {
240 let mut permissions = self.permissions.lock().await;
241 for p in permissions.values_mut() {
242 p.stop();
243 }
244 }
245
246 {
247 let mut channel_bindings = self.channel_bindings.lock().await;
248 for c in channel_bindings.values_mut() {
249 c.stop();
250 }
251 }
252
253 log::trace!("allocation with {} closed!", self.five_tuple);
254
255 let _ = self.turn_socket.close().await;
256 let _ = self.relay_socket.close().await;
257
258 if let Some(notify_tx) = &self.alloc_close_notify {
259 let _ = notify_tx
260 .send(AllocationInfo {
261 five_tuple: self.five_tuple,
262 username: self.username.text.clone(),
263 relay_addr: self.relay_addr,
264 #[cfg(feature = "metrics")]
265 relayed_bytes: self.relayed_bytes.load(Ordering::Acquire),
266 })
267 .await;
268 }
269
270 Ok(())
271 }
272
273 pub async fn start(&self, lifetime: Duration) {
274 let (reset_tx, mut reset_rx) = mpsc::channel(1);
275 self.reset_tx.lock().replace(reset_tx);
276
277 let allocations = self.allocations.clone();
278 let five_tuple = self.five_tuple;
279 let timer_expired = Arc::clone(&self.timer_expired);
280
281 tokio::spawn(async move {
282 let timer = tokio::time::sleep(lifetime);
283 tokio::pin!(timer);
284 let mut done = false;
285
286 while !done {
287 tokio::select! {
288 _ = &mut timer => {
289 if let Some(allocs) = &allocations.upgrade(){
290 let mut allocs = allocs.lock().await;
291 if let Some(a) = allocs.remove(&five_tuple) {
292 let _ = a.close().await;
293 }
294 }
295 done = true;
296 },
297 result = reset_rx.recv() => {
298 if let Some(d) = result {
299 timer.as_mut().reset(Instant::now() + d);
300 } else {
301 done = true;
302 }
303 },
304 }
305 }
306
307 timer_expired.store(true, Ordering::SeqCst);
308 });
309 }
310
311 fn stop(&self) -> bool {
312 let reset_tx = self.reset_tx.lock().take();
313 reset_tx.is_none() || self.timer_expired.load(Ordering::SeqCst)
314 }
315
316 pub async fn refresh(&self, lifetime: Duration) {
318 let reset_tx = self.reset_tx.lock().clone();
319 if let Some(tx) = reset_tx {
320 let _ = tx.send(lifetime).await;
321 }
322 }
323
324 async fn packet_handler(&mut self) {
344 let five_tuple = self.five_tuple;
345 let relay_addr = self.relay_addr;
346 let relay_socket = Arc::clone(&self.relay_socket);
347 let turn_socket = Arc::clone(&self.turn_socket);
348 let allocations = self.allocations.clone();
349 let channel_bindings = Arc::clone(&self.channel_bindings);
350 let permissions = Arc::clone(&self.permissions);
351 let (drop_tx, drop_rx) = oneshot::channel::<u32>();
352 self.drop_tx = Some(drop_tx);
353
354 tokio::spawn(async move {
355 let mut buffer = vec![0u8; RTP_MTU];
356
357 tokio::pin!(drop_rx);
358
359 loop {
360 let (n, src_addr) = tokio::select! {
361 result = relay_socket.recv_from(&mut buffer) => {
362 match result {
363 Ok((n, src_addr)) => (n, src_addr),
364 Err(_) => {
365 if let Some(allocs) = &allocations.upgrade() {
366 let mut allocs = allocs.lock().await;
367 allocs.remove(&five_tuple);
368 }
369 break;
370 }
371 }
372 }
373 _ = drop_rx.as_mut() => {
374 log::trace!("allocation has stopped, stop packet_handler. five_tuple: {:?}", five_tuple);
375 break;
376 }
377 };
378
379 log::debug!(
380 "relay socket {:?} received {} bytes from {}",
381 relay_socket.local_addr(),
382 n,
383 src_addr
384 );
385
386 let cb_number = {
387 let mut cb_number = None;
388 let cbs = channel_bindings.lock().await;
389 for cb in cbs.values() {
390 if cb.peer == src_addr {
391 cb_number = Some(cb.number);
392 break;
393 }
394 }
395 cb_number
396 };
397
398 if let Some(number) = cb_number {
399 let mut channel_data = ChannelData {
400 data: buffer[..n].to_vec(),
401 number,
402 raw: vec![],
403 };
404 channel_data.encode();
405
406 if let Err(err) = turn_socket
407 .send_to(&channel_data.raw, five_tuple.src_addr)
408 .await
409 {
410 log::error!(
411 "Failed to send ChannelData from allocation {} {}",
412 src_addr,
413 err
414 );
415 }
416 } else {
417 let exist = {
418 let ps = permissions.lock().await;
419 ps.get(&addr2ipfingerprint(&src_addr)).is_some()
420 };
421
422 if exist {
423 let msg = {
424 let peer_address_attr = PeerAddress {
425 ip: src_addr.ip(),
426 port: src_addr.port(),
427 };
428 let data_attr = Data(buffer[..n].to_vec());
429
430 let mut msg = Message::new();
431 if let Err(err) = msg.build(&[
432 Box::new(TransactionId::new()),
433 Box::new(MessageType::new(METHOD_DATA, CLASS_INDICATION)),
434 Box::new(peer_address_attr),
435 Box::new(data_attr),
436 ]) {
437 log::error!(
438 "Failed to send DataIndication from allocation {} {}",
439 src_addr,
440 err
441 );
442 None
443 } else {
444 Some(msg)
445 }
446 };
447
448 if let Some(msg) = msg {
449 log::debug!(
450 "relaying message from {} to client at {}",
451 src_addr,
452 five_tuple.src_addr
453 );
454 if let Err(err) =
455 turn_socket.send_to(&msg.raw, five_tuple.src_addr).await
456 {
457 log::error!(
458 "Failed to send DataIndication from allocation {} {}",
459 src_addr,
460 err
461 );
462 }
463 }
464 } else {
465 log::info!(
466 "No Permission or Channel exists for {} on allocation {}",
467 src_addr,
468 relay_addr
469 );
470 }
471 }
472 }
473 });
474 }
475}