webrtc_turn/allocation/
permission.rs

1use super::*;
2
3use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
4use tokio::sync::Mutex;
5use tokio::time::{Duration, Instant};
6
7pub(crate) const PERMISSION_TIMEOUT: Duration = Duration::from_secs(5 * 60);
8
9// Permission represents a TURN permission. TURN permissions mimic the address-restricted
10// filtering mechanism of NATs that comply with [RFC4787].
11// https://tools.ietf.org/html/rfc5766#section-2.3
12pub struct Permission {
13    pub(crate) addr: SocketAddr,
14    pub(crate) permissions: Option<Arc<Mutex<HashMap<String, Permission>>>>,
15    reset_tx: Option<mpsc::Sender<Duration>>,
16    timer_expired: Arc<AtomicBool>,
17}
18
19impl Permission {
20    // NewPermission create a new Permission
21    pub fn new(addr: SocketAddr) -> Self {
22        Permission {
23            addr,
24            permissions: None,
25            reset_tx: None,
26            timer_expired: Arc::new(AtomicBool::new(false)),
27        }
28    }
29
30    pub(crate) async fn start(&mut self, lifetime: Duration) {
31        let (reset_tx, mut reset_rx) = mpsc::channel(1);
32        self.reset_tx = Some(reset_tx);
33
34        let permissions = self.permissions.clone();
35        let addr = self.addr;
36        let timer_expired = Arc::clone(&self.timer_expired);
37
38        tokio::spawn(async move {
39            let timer = tokio::time::sleep(lifetime);
40            tokio::pin!(timer);
41            let mut done = false;
42
43            while !done {
44                tokio::select! {
45                    _ = &mut timer => {
46                        if let Some(perms) = &permissions{
47                            let mut p = perms.lock().await;
48                            p.remove(&addr2ipfingerprint(&addr));
49                        }
50                        done = true;
51                    },
52                    result = reset_rx.recv() => {
53                        if let Some(d) = result {
54                            timer.as_mut().reset(Instant::now() + d);
55                        } else {
56                            done = true;
57                        }
58                    },
59                }
60            }
61
62            timer_expired.store(true, Ordering::SeqCst);
63        });
64    }
65
66    pub(crate) fn stop(&mut self) -> bool {
67        let expired = self.reset_tx.is_none() || self.timer_expired.load(Ordering::SeqCst);
68        self.reset_tx.take();
69        expired
70    }
71
72    pub(crate) async fn refresh(&self, lifetime: Duration) {
73        if let Some(tx) = &self.reset_tx {
74            let _ = tx.send(lifetime).await;
75        }
76    }
77}