webrtc_turn/allocation/
mod.rs

1#[cfg(test)]
2mod allocation_test;
3
4pub mod allocation_manager;
5pub mod channel_bind;
6pub mod five_tuple;
7pub mod permission;
8
9use crate::errors::*;
10use crate::proto::{chandata::*, channum::*, data::*, peeraddr::*, *};
11use channel_bind::*;
12use five_tuple::*;
13use permission::*;
14
15use stun::agent::*;
16use stun::message::*;
17
18use util::{Conn, Error};
19
20use tokio::sync::{mpsc, Mutex};
21use tokio::time::{Duration, Instant};
22
23use std::collections::HashMap;
24use std::marker::{Send, Sync};
25use std::net::SocketAddr;
26use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
27
28const RTP_MTU: usize = 1500;
29
30pub type AllocationMap = Arc<Mutex<HashMap<String, Arc<Mutex<Allocation>>>>>;
31
32// Allocation is tied to a FiveTuple and relays traffic
33// use create_allocation and get_allocation to operate
34pub struct Allocation {
35    protocol: Protocol,
36    turn_socket: Arc<dyn Conn + Send + Sync>,
37    pub(crate) relay_addr: SocketAddr,
38    pub(crate) relay_socket: Arc<dyn Conn + Send + Sync>,
39    five_tuple: FiveTuple,
40    permissions: Arc<Mutex<HashMap<String, Permission>>>,
41    channel_bindings: Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>,
42    pub(crate) allocations: Option<AllocationMap>,
43    reset_tx: Option<mpsc::Sender<Duration>>,
44    timer_expired: Arc<AtomicBool>,
45    closed: bool, // Option<mpsc::Receiver<()>>,
46}
47
48fn addr2ipfingerprint(addr: &SocketAddr) -> String {
49    addr.ip().to_string()
50}
51
52impl Allocation {
53    // creates a new instance of NewAllocation.
54    pub fn new(
55        turn_socket: Arc<dyn Conn + Send + Sync>,
56        relay_socket: Arc<dyn Conn + Send + Sync>,
57        relay_addr: SocketAddr,
58        five_tuple: FiveTuple,
59    ) -> Self {
60        Allocation {
61            protocol: PROTO_UDP,
62            turn_socket,
63            relay_addr,
64            relay_socket,
65            five_tuple,
66            permissions: Arc::new(Mutex::new(HashMap::new())),
67            channel_bindings: Arc::new(Mutex::new(HashMap::new())),
68            allocations: None,
69            reset_tx: None,
70            timer_expired: Arc::new(AtomicBool::new(false)),
71            closed: false,
72        }
73    }
74
75    // has_permission gets the Permission from the allocation
76    pub async fn has_permission(&self, addr: &SocketAddr) -> bool {
77        let permissions = self.permissions.lock().await;
78        permissions.get(&addr2ipfingerprint(addr)).is_some()
79    }
80
81    // add_permission adds a new permission to the allocation
82    pub async fn add_permission(&self, mut p: Permission) {
83        let fingerprint = addr2ipfingerprint(&p.addr);
84
85        {
86            let permissions = self.permissions.lock().await;
87            if let Some(existed_permission) = permissions.get(&fingerprint) {
88                existed_permission.refresh(PERMISSION_TIMEOUT).await;
89                return;
90            }
91        }
92
93        p.permissions = Some(Arc::clone(&self.permissions));
94        p.start(PERMISSION_TIMEOUT).await;
95
96        {
97            let mut permissions = self.permissions.lock().await;
98            permissions.insert(fingerprint, p);
99        }
100    }
101
102    // remove_permission removes the net.Addr's fingerprint from the allocation's permissions
103    pub async fn remove_permission(&self, addr: &SocketAddr) -> bool {
104        let mut permissions = self.permissions.lock().await;
105        permissions.remove(&addr2ipfingerprint(addr)).is_some()
106    }
107
108    // add_channel_bind adds a new ChannelBind to the allocation, it also updates the
109    // permissions needed for this ChannelBind
110    pub async fn add_channel_bind(
111        &self,
112        mut c: ChannelBind,
113        lifetime: Duration,
114    ) -> Result<(), Error> {
115        {
116            if let Some(addr) = self.get_channel_addr(&c.number).await {
117                if addr != c.peer {
118                    return Err(ERR_SAME_CHANNEL_DIFFERENT_PEER.to_owned());
119                }
120            }
121
122            if let Some(number) = self.get_channel_number(&c.peer).await {
123                if number != c.number {
124                    return Err(ERR_SAME_CHANNEL_DIFFERENT_PEER.to_owned());
125                }
126            }
127        }
128
129        {
130            let channel_bindings = self.channel_bindings.lock().await;
131            if let Some(cb) = channel_bindings.get(&c.number) {
132                cb.refresh(lifetime).await;
133
134                // Channel binds also refresh permissions.
135                self.add_permission(Permission::new(cb.peer)).await;
136
137                return Ok(());
138            }
139        }
140
141        let peer = c.peer;
142
143        // Add or refresh this channel.
144        c.channel_bindings = Some(Arc::clone(&self.channel_bindings));
145        c.start(lifetime).await;
146
147        {
148            let mut channel_bindings = self.channel_bindings.lock().await;
149            channel_bindings.insert(c.number, c);
150        }
151
152        // Channel binds also refresh permissions.
153        self.add_permission(Permission::new(peer)).await;
154
155        Ok(())
156    }
157
158    // remove_channel_bind removes the ChannelBind from this allocation by id
159    pub async fn remove_channel_bind(&self, number: ChannelNumber) -> bool {
160        let mut channel_bindings = self.channel_bindings.lock().await;
161        channel_bindings.remove(&number).is_some()
162    }
163
164    // get_channel_addr gets the ChannelBind's addr
165    pub async fn get_channel_addr(&self, number: &ChannelNumber) -> Option<SocketAddr> {
166        let channel_bindings = self.channel_bindings.lock().await;
167        if let Some(cb) = channel_bindings.get(number) {
168            Some(cb.peer)
169        } else {
170            None
171        }
172    }
173
174    // GetChannelByAddr gets the ChannelBind's number from this allocation by net.Addr
175    pub async fn get_channel_number(&self, addr: &SocketAddr) -> Option<ChannelNumber> {
176        let channel_bindings = self.channel_bindings.lock().await;
177        for cb in channel_bindings.values() {
178            if cb.peer == *addr {
179                return Some(cb.number);
180            }
181        }
182        None
183    }
184
185    // Close closes the allocation
186    pub async fn close(&mut self) -> Result<(), Error> {
187        if self.closed {
188            return Err(ERR_CLOSED.to_owned());
189        }
190
191        self.closed = true;
192        self.stop();
193
194        {
195            let mut permissions = self.permissions.lock().await;
196            for p in permissions.values_mut() {
197                p.stop();
198            }
199        }
200
201        {
202            let mut channel_bindings = self.channel_bindings.lock().await;
203            for c in channel_bindings.values_mut() {
204                c.stop();
205            }
206        }
207
208        log::trace!("allocation with {} closed!", self.five_tuple);
209
210        Ok(())
211    }
212
213    pub async fn start(&mut self, lifetime: Duration) {
214        let (reset_tx, mut reset_rx) = mpsc::channel(1);
215        self.reset_tx = Some(reset_tx);
216
217        let allocations = self.allocations.clone();
218        let five_tuple = self.five_tuple.clone();
219        let timer_expired = Arc::clone(&self.timer_expired);
220
221        tokio::spawn(async move {
222            let timer = tokio::time::sleep(lifetime);
223            tokio::pin!(timer);
224            let mut done = false;
225
226            while !done {
227                tokio::select! {
228                    _ = &mut timer => {
229                        if let Some(allocs) = &allocations{
230                            let mut alls = allocs.lock().await;
231                            if let Some(a) = alls.remove(&five_tuple.fingerprint()) {
232                                let mut a = a.lock().await;
233                                let _ = a.close().await;
234                            }
235                        }
236                        done = true;
237                    },
238                    result = reset_rx.recv() => {
239                        if let Some(d) = result {
240                            timer.as_mut().reset(Instant::now() + d);
241                        } else {
242                            done = true;
243                        }
244                    },
245                }
246            }
247
248            timer_expired.store(true, Ordering::SeqCst);
249        });
250    }
251
252    pub fn stop(&mut self) -> bool {
253        let expired = self.reset_tx.is_none() || self.timer_expired.load(Ordering::SeqCst);
254        self.reset_tx.take();
255        expired
256    }
257
258    // Refresh updates the allocations lifetime
259    pub async fn refresh(&self, lifetime: Duration) {
260        if let Some(tx) = &self.reset_tx {
261            let _ = tx.send(lifetime).await;
262        }
263    }
264
265    //  https://tools.ietf.org/html/rfc5766#section-10.3
266    //  When the server receives a UDP datagram at a currently allocated
267    //  relayed transport address, the server looks up the allocation
268    //  associated with the relayed transport address.  The server then
269    //  checks to see whether the set of permissions for the allocation allow
270    //  the relaying of the UDP datagram as described in Section 8.
271    //
272    //  If relaying is permitted, then the server checks if there is a
273    //  channel bound to the peer that sent the UDP datagram (see
274    //  Section 11).  If a channel is bound, then processing proceeds as
275    //  described in Section 11.7.
276    //
277    //  If relaying is permitted but no channel is bound to the peer, then
278    //  the server forms and sends a Data indication.  The Data indication
279    //  MUST contain both an XOR-PEER-ADDRESS and a DATA attribute.  The DATA
280    //  attribute is set to the value of the 'data octets' field from the
281    //  datagram, and the XOR-PEER-ADDRESS attribute is set to the source
282    //  transport address of the received UDP datagram.  The Data indication
283    //  is then sent on the 5-tuple associated with the allocation.
284    async fn packet_handler(&self) {
285        let five_tuple = self.five_tuple.clone();
286        let relay_addr = self.relay_addr;
287        let relay_socket = Arc::clone(&self.relay_socket);
288        let turn_socket = Arc::clone(&self.turn_socket);
289        let allocations = self.allocations.clone();
290        let channel_bindings = Arc::clone(&self.channel_bindings);
291        let permissions = Arc::clone(&self.permissions);
292
293        tokio::spawn(async move {
294            let mut buffer = vec![0u8; RTP_MTU];
295
296            loop {
297                let (n, src_addr) = match relay_socket.recv_from(&mut buffer).await {
298                    Ok((n, src_addr)) => (n, src_addr),
299                    Err(_) => {
300                        if let Some(allocs) = &allocations {
301                            let mut alls = allocs.lock().await;
302                            alls.remove(&five_tuple.fingerprint());
303                        }
304                        break;
305                    }
306                };
307
308                log::debug!(
309                    "relay socket {:?} received {} bytes from {}",
310                    relay_socket.local_addr().await,
311                    n,
312                    src_addr
313                );
314
315                let cb_number = {
316                    let mut cb_number = None;
317                    let cbs = channel_bindings.lock().await;
318                    for cb in cbs.values() {
319                        if cb.peer == src_addr {
320                            cb_number = Some(cb.number);
321                            break;
322                        }
323                    }
324                    cb_number
325                };
326
327                if let Some(number) = cb_number {
328                    let mut channel_data = ChannelData {
329                        data: buffer[..n].to_vec(),
330                        number,
331                        raw: vec![],
332                    };
333                    channel_data.encode();
334
335                    if let Err(err) = turn_socket
336                        .send_to(&channel_data.raw, five_tuple.src_addr)
337                        .await
338                    {
339                        log::error!(
340                            "Failed to send ChannelData from allocation {} {}",
341                            src_addr,
342                            err
343                        );
344                    }
345                } else {
346                    let exist = {
347                        let ps = permissions.lock().await;
348                        ps.get(&addr2ipfingerprint(&src_addr)).is_some()
349                    };
350
351                    if exist {
352                        let msg = {
353                            let peer_address_attr = PeerAddress {
354                                ip: src_addr.ip(),
355                                port: src_addr.port(),
356                            };
357                            let data_attr = Data(buffer[..n].to_vec());
358
359                            let mut msg = Message::new();
360                            if let Err(err) = msg.build(&[
361                                Box::new(TransactionId::new()),
362                                Box::new(MessageType::new(METHOD_DATA, CLASS_INDICATION)),
363                                Box::new(peer_address_attr),
364                                Box::new(data_attr),
365                            ]) {
366                                log::error!(
367                                    "Failed to send DataIndication from allocation {} {}",
368                                    src_addr,
369                                    err
370                                );
371                                None
372                            } else {
373                                Some(msg)
374                            }
375                        };
376
377                        if let Some(msg) = msg {
378                            log::debug!(
379                                "relaying message from {} to client at {}",
380                                src_addr,
381                                five_tuple.src_addr
382                            );
383                            if let Err(err) =
384                                turn_socket.send_to(&msg.raw, five_tuple.src_addr).await
385                            {
386                                log::error!(
387                                    "Failed to send DataIndication from allocation {} {}",
388                                    src_addr,
389                                    err
390                                );
391                            }
392                        }
393                    } else {
394                        log::info!(
395                            "No Permission or Channel exists for {} on allocation {}",
396                            src_addr,
397                            relay_addr
398                        );
399                    }
400                }
401            }
402        });
403    }
404}