webrtc_turn/allocation/
channel_bind.rs1#[cfg(test)]
2mod channel_bind_test;
3
4use super::*;
5use crate::proto::channum::*;
6
7use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
8use tokio::sync::Mutex;
9use tokio::time::{Duration, Instant};
10
11#[derive(Clone)]
14pub struct ChannelBind {
15 pub(crate) peer: SocketAddr,
16 pub(crate) number: ChannelNumber,
17 pub(crate) channel_bindings: Option<Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>>,
18 reset_tx: Option<mpsc::Sender<Duration>>,
19 timer_expired: Arc<AtomicBool>,
20}
21
22impl ChannelBind {
23 pub fn new(number: ChannelNumber, peer: SocketAddr) -> Self {
25 ChannelBind {
26 number,
27 peer,
28 channel_bindings: None,
29 reset_tx: None,
30 timer_expired: Arc::new(AtomicBool::new(false)),
31 }
32 }
33
34 pub(crate) async fn start(&mut self, lifetime: Duration) {
35 let (reset_tx, mut reset_rx) = mpsc::channel(1);
36 self.reset_tx = Some(reset_tx);
37
38 let channel_bindings = self.channel_bindings.clone();
39 let number = self.number;
40 let timer_expired = Arc::clone(&self.timer_expired);
41
42 tokio::spawn(async move {
43 let timer = tokio::time::sleep(lifetime);
44 tokio::pin!(timer);
45 let mut done = false;
46
47 while !done {
48 tokio::select! {
49 _ = &mut timer => {
50 if let Some(cbs) = &channel_bindings{
51 let mut cb = cbs.lock().await;
52 if cb.remove(&number).is_none() {
53 log::error!("Failed to remove ChannelBind for {}", number);
54 }
55 }
56 done = true;
57 },
58 result = reset_rx.recv() => {
59 if let Some(d) = result {
60 timer.as_mut().reset(Instant::now() + d);
61 } else {
62 done = true;
63 }
64 },
65 }
66 }
67
68 timer_expired.store(true, Ordering::SeqCst);
69 });
70 }
71
72 pub(crate) fn stop(&mut self) -> bool {
73 let expired = self.reset_tx.is_none() || self.timer_expired.load(Ordering::SeqCst);
74 self.reset_tx.take();
75 expired
76 }
77
78 pub(crate) async fn refresh(&self, lifetime: Duration) {
79 if let Some(tx) = &self.reset_tx {
80 let _ = tx.send(lifetime).await;
81 }
82 }
83}