webrtc_turn/allocation/
permission.rs1use super::*;
2
3use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
4use tokio::sync::Mutex;
5use tokio::time::{Duration, Instant};
6
7pub(crate) const PERMISSION_TIMEOUT: Duration = Duration::from_secs(5 * 60);
8
9pub struct Permission {
13 pub(crate) addr: SocketAddr,
14 pub(crate) permissions: Option<Arc<Mutex<HashMap<String, Permission>>>>,
15 reset_tx: Option<mpsc::Sender<Duration>>,
16 timer_expired: Arc<AtomicBool>,
17}
18
19impl Permission {
20 pub fn new(addr: SocketAddr) -> Self {
22 Permission {
23 addr,
24 permissions: None,
25 reset_tx: None,
26 timer_expired: Arc::new(AtomicBool::new(false)),
27 }
28 }
29
30 pub(crate) async fn start(&mut self, lifetime: Duration) {
31 let (reset_tx, mut reset_rx) = mpsc::channel(1);
32 self.reset_tx = Some(reset_tx);
33
34 let permissions = self.permissions.clone();
35 let addr = self.addr;
36 let timer_expired = Arc::clone(&self.timer_expired);
37
38 tokio::spawn(async move {
39 let timer = tokio::time::sleep(lifetime);
40 tokio::pin!(timer);
41 let mut done = false;
42
43 while !done {
44 tokio::select! {
45 _ = &mut timer => {
46 if let Some(perms) = &permissions{
47 let mut p = perms.lock().await;
48 p.remove(&addr2ipfingerprint(&addr));
49 }
50 done = true;
51 },
52 result = reset_rx.recv() => {
53 if let Some(d) = result {
54 timer.as_mut().reset(Instant::now() + d);
55 } else {
56 done = true;
57 }
58 },
59 }
60 }
61
62 timer_expired.store(true, Ordering::SeqCst);
63 });
64 }
65
66 pub(crate) fn stop(&mut self) -> bool {
67 let expired = self.reset_tx.is_none() || self.timer_expired.load(Ordering::SeqCst);
68 self.reset_tx.take();
69 expired
70 }
71
72 pub(crate) async fn refresh(&self, lifetime: Duration) {
73 if let Some(tx) = &self.reset_tx {
74 let _ = tx.send(lifetime).await;
75 }
76 }
77}