webrtc_turn/allocation/
channel_bind.rs

1#[cfg(test)]
2mod channel_bind_test;
3
4use super::*;
5use crate::proto::channum::*;
6
7use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
8use tokio::sync::Mutex;
9use tokio::time::{Duration, Instant};
10
11// ChannelBind represents a TURN Channel
12// https://tools.ietf.org/html/rfc5766#section-2.5
13#[derive(Clone)]
14pub struct ChannelBind {
15    pub(crate) peer: SocketAddr,
16    pub(crate) number: ChannelNumber,
17    pub(crate) channel_bindings: Option<Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>>,
18    reset_tx: Option<mpsc::Sender<Duration>>,
19    timer_expired: Arc<AtomicBool>,
20}
21
22impl ChannelBind {
23    // NewChannelBind creates a new ChannelBind
24    pub fn new(number: ChannelNumber, peer: SocketAddr) -> Self {
25        ChannelBind {
26            number,
27            peer,
28            channel_bindings: None,
29            reset_tx: None,
30            timer_expired: Arc::new(AtomicBool::new(false)),
31        }
32    }
33
34    pub(crate) async fn start(&mut self, lifetime: Duration) {
35        let (reset_tx, mut reset_rx) = mpsc::channel(1);
36        self.reset_tx = Some(reset_tx);
37
38        let channel_bindings = self.channel_bindings.clone();
39        let number = self.number;
40        let timer_expired = Arc::clone(&self.timer_expired);
41
42        tokio::spawn(async move {
43            let timer = tokio::time::sleep(lifetime);
44            tokio::pin!(timer);
45            let mut done = false;
46
47            while !done {
48                tokio::select! {
49                    _ = &mut timer => {
50                        if let Some(cbs) = &channel_bindings{
51                            let mut cb = cbs.lock().await;
52                            if cb.remove(&number).is_none() {
53                                log::error!("Failed to remove ChannelBind for {}", number);
54                            }
55                        }
56                        done = true;
57                    },
58                    result = reset_rx.recv() => {
59                        if let Some(d) = result {
60                            timer.as_mut().reset(Instant::now() + d);
61                        } else {
62                            done = true;
63                        }
64                    },
65                }
66            }
67
68            timer_expired.store(true, Ordering::SeqCst);
69        });
70    }
71
72    pub(crate) fn stop(&mut self) -> bool {
73        let expired = self.reset_tx.is_none() || self.timer_expired.load(Ordering::SeqCst);
74        self.reset_tx.take();
75        expired
76    }
77
78    pub(crate) async fn refresh(&self, lifetime: Duration) {
79        if let Some(tx) = &self.reset_tx {
80            let _ = tx.send(lifetime).await;
81        }
82    }
83}