Skip to main content

clia_turn/allocation/
mod.rs

1#[cfg(test)]
2mod allocation_test;
3
4pub mod allocation_manager;
5pub mod channel_bind;
6pub mod five_tuple;
7pub mod permission;
8
9use std::collections::HashMap;
10use std::marker::{Send, Sync};
11use std::net::SocketAddr;
12use std::sync::atomic::Ordering;
13use std::sync::{Arc, Weak};
14
15use channel_bind::*;
16use five_tuple::*;
17use permission::*;
18use portable_atomic::{AtomicBool, AtomicUsize};
19use stun::agent::*;
20use stun::message::*;
21use stun::textattrs::Username;
22use tokio::sync::oneshot::{self, Sender};
23use tokio::sync::{mpsc, Mutex};
24use tokio::time::{Duration, Instant};
25use util::sync::Mutex as SyncMutex;
26use util::Conn;
27
28use crate::error::*;
29use crate::proto::chandata::*;
30use crate::proto::channum::*;
31use crate::proto::data::*;
32use crate::proto::peeraddr::*;
33use crate::proto::*;
34
35const RTP_MTU: usize = 1500;
36
37pub type AllocationMap = HashMap<FiveTuple, Arc<Allocation>>;
38
39/// Information about an [`Allocation`].
40#[derive(Debug, Clone)]
41pub struct AllocationInfo {
42    /// [`FiveTuple`] of this [`Allocation`].
43    pub five_tuple: FiveTuple,
44
45    /// Username of this [`Allocation`].
46    pub username: String,
47
48    /// Relay address of this [`Allocation`].
49    pub relay_addr: SocketAddr,
50
51    /// Relayed bytes with this [`Allocation`].
52    #[cfg(feature = "metrics")]
53    pub relayed_bytes: usize,
54}
55
56impl AllocationInfo {
57    /// Creates a new [`AllocationInfo`].
58    pub fn new(
59        five_tuple: FiveTuple,
60        username: String,
61        relay_addr: SocketAddr,
62        #[cfg(feature = "metrics")] relayed_bytes: usize,
63    ) -> Self {
64        Self {
65            five_tuple,
66            username,
67            relay_addr,
68            #[cfg(feature = "metrics")]
69            relayed_bytes,
70        }
71    }
72}
73
74/// `Allocation` is tied to a FiveTuple and relays traffic
75/// use create_allocation and get_allocation to operate.
76pub struct Allocation {
77    protocol: Protocol,
78    turn_socket: Arc<dyn Conn + Send + Sync>,
79    pub(crate) relay_addr: SocketAddr,
80    pub(crate) relay_socket: Arc<dyn Conn + Send + Sync>,
81    five_tuple: FiveTuple,
82    username: Username,
83    permissions: Arc<Mutex<HashMap<String, Permission>>>,
84    channel_bindings: Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>,
85    allocations: Weak<Mutex<AllocationMap>>,
86    reset_tx: SyncMutex<Option<mpsc::Sender<Duration>>>,
87    timer_expired: Arc<AtomicBool>,
88    closed: AtomicBool, // Option<mpsc::Receiver<()>>,
89    pub(crate) relayed_bytes: AtomicUsize,
90    drop_tx: Option<Sender<u32>>,
91    alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
92}
93
94fn addr2ipfingerprint(addr: &SocketAddr) -> String {
95    addr.ip().to_string()
96}
97
98impl Allocation {
99    /// Creates a new [`Allocation`].
100    pub fn new(
101        turn_socket: Arc<dyn Conn + Send + Sync>,
102        relay_socket: Arc<dyn Conn + Send + Sync>,
103        relay_addr: SocketAddr,
104        five_tuple: FiveTuple,
105        username: Username,
106        allocation_map: Weak<Mutex<AllocationMap>>,
107        alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
108    ) -> Self {
109        Allocation {
110            protocol: PROTO_UDP,
111            turn_socket,
112            relay_addr,
113            relay_socket,
114            five_tuple,
115            username,
116            permissions: Arc::new(Mutex::new(HashMap::new())),
117            channel_bindings: Arc::new(Mutex::new(HashMap::new())),
118            allocations: allocation_map,
119            reset_tx: SyncMutex::new(None),
120            timer_expired: Arc::new(AtomicBool::new(false)),
121            closed: AtomicBool::new(false),
122            relayed_bytes: Default::default(),
123            drop_tx: None,
124            alloc_close_notify,
125        }
126    }
127
128    /// Checks the Permission for the `addr`.
129    pub async fn has_permission(&self, addr: &SocketAddr) -> bool {
130        let permissions = self.permissions.lock().await;
131        permissions.get(&addr2ipfingerprint(addr)).is_some()
132    }
133
134    /// Adds a new [`Permission`] to this [`Allocation`].
135    pub async fn add_permission(&self, mut p: Permission) {
136        let fingerprint = addr2ipfingerprint(&p.addr);
137
138        {
139            let permissions = self.permissions.lock().await;
140            if let Some(existed_permission) = permissions.get(&fingerprint) {
141                existed_permission.refresh(PERMISSION_TIMEOUT).await;
142                return;
143            }
144        }
145
146        p.permissions = Some(Arc::downgrade(&self.permissions));
147        p.start(PERMISSION_TIMEOUT).await;
148
149        {
150            let mut permissions = self.permissions.lock().await;
151            permissions.insert(fingerprint, p);
152        }
153    }
154
155    /// Removes the `addr`'s fingerprint from this [`Allocation`]'s permissions.
156    pub async fn remove_permission(&self, addr: &SocketAddr) -> bool {
157        let mut permissions = self.permissions.lock().await;
158        permissions.remove(&addr2ipfingerprint(addr)).is_some()
159    }
160
161    /// Adds a new [`ChannelBind`] to this [`Allocation`], it also updates the
162    /// permissions needed for this [`ChannelBind`].
163    pub async fn add_channel_bind(&self, mut c: ChannelBind, lifetime: Duration) -> Result<()> {
164        {
165            if let Some(addr) = self.get_channel_addr(&c.number).await {
166                if addr != c.peer {
167                    return Err(Error::ErrSameChannelDifferentPeer);
168                }
169            }
170
171            if let Some(number) = self.get_channel_number(&c.peer).await {
172                if number != c.number {
173                    return Err(Error::ErrSameChannelDifferentPeer);
174                }
175            }
176        }
177
178        {
179            let channel_bindings = self.channel_bindings.lock().await;
180            if let Some(cb) = channel_bindings.get(&c.number) {
181                cb.refresh(lifetime).await;
182
183                // Channel binds also refresh permissions.
184                self.add_permission(Permission::new(cb.peer)).await;
185
186                return Ok(());
187            }
188        }
189
190        let peer = c.peer;
191
192        // Add or refresh this channel.
193        c.channel_bindings = Some(Arc::downgrade(&self.channel_bindings));
194        c.start(lifetime).await;
195
196        {
197            let mut channel_bindings = self.channel_bindings.lock().await;
198            channel_bindings.insert(c.number, c);
199        }
200
201        // Channel binds also refresh permissions.
202        self.add_permission(Permission::new(peer)).await;
203
204        Ok(())
205    }
206
207    /// Removes the [`ChannelBind`] from this [`Allocation`] by `number`.
208    pub async fn remove_channel_bind(&self, number: ChannelNumber) -> bool {
209        let mut channel_bindings = self.channel_bindings.lock().await;
210        channel_bindings.remove(&number).is_some()
211    }
212
213    /// Gets the [`ChannelBind`]'s address by `number`.
214    pub async fn get_channel_addr(&self, number: &ChannelNumber) -> Option<SocketAddr> {
215        let channel_bindings = self.channel_bindings.lock().await;
216        channel_bindings.get(number).map(|cb| cb.peer)
217    }
218
219    /// Gets the [`ChannelBind`]'s number from this [`Allocation`] by `addr`.
220    pub async fn get_channel_number(&self, addr: &SocketAddr) -> Option<ChannelNumber> {
221        let channel_bindings = self.channel_bindings.lock().await;
222        for cb in channel_bindings.values() {
223            if cb.peer == *addr {
224                return Some(cb.number);
225            }
226        }
227        None
228    }
229
230    /// Closes the [`Allocation`].
231    pub async fn close(&self) -> Result<()> {
232        if self.closed.load(Ordering::Acquire) {
233            return Err(Error::ErrClosed);
234        }
235
236        self.closed.store(true, Ordering::Release);
237        self.stop();
238
239        {
240            let mut permissions = self.permissions.lock().await;
241            for p in permissions.values_mut() {
242                p.stop();
243            }
244        }
245
246        {
247            let mut channel_bindings = self.channel_bindings.lock().await;
248            for c in channel_bindings.values_mut() {
249                c.stop();
250            }
251        }
252
253        log::trace!("allocation with {} closed!", self.five_tuple);
254
255        let _ = self.turn_socket.close().await;
256        let _ = self.relay_socket.close().await;
257
258        if let Some(notify_tx) = &self.alloc_close_notify {
259            let _ = notify_tx
260                .send(AllocationInfo {
261                    five_tuple: self.five_tuple,
262                    username: self.username.text.clone(),
263                    relay_addr: self.relay_addr,
264                    #[cfg(feature = "metrics")]
265                    relayed_bytes: self.relayed_bytes.load(Ordering::Acquire),
266                })
267                .await;
268        }
269
270        Ok(())
271    }
272
273    pub async fn start(&self, lifetime: Duration) {
274        let (reset_tx, mut reset_rx) = mpsc::channel(1);
275        self.reset_tx.lock().replace(reset_tx);
276
277        let allocations = self.allocations.clone();
278        let five_tuple = self.five_tuple;
279        let timer_expired = Arc::clone(&self.timer_expired);
280
281        tokio::spawn(async move {
282            let timer = tokio::time::sleep(lifetime);
283            tokio::pin!(timer);
284            let mut done = false;
285
286            while !done {
287                tokio::select! {
288                    _ = &mut timer => {
289                        if let Some(allocs) = &allocations.upgrade(){
290                            let mut allocs = allocs.lock().await;
291                            if let Some(a) = allocs.remove(&five_tuple) {
292                                let _ = a.close().await;
293                            }
294                        }
295                        done = true;
296                    },
297                    result = reset_rx.recv() => {
298                        if let Some(d) = result {
299                            timer.as_mut().reset(Instant::now() + d);
300                        } else {
301                            done = true;
302                        }
303                    },
304                }
305            }
306
307            timer_expired.store(true, Ordering::SeqCst);
308        });
309    }
310
311    fn stop(&self) -> bool {
312        let reset_tx = self.reset_tx.lock().take();
313        reset_tx.is_none() || self.timer_expired.load(Ordering::SeqCst)
314    }
315
316    /// Updates the allocations lifetime.
317    pub async fn refresh(&self, lifetime: Duration) {
318        let reset_tx = self.reset_tx.lock().clone();
319        if let Some(tx) = reset_tx {
320            let _ = tx.send(lifetime).await;
321        }
322    }
323
324    //  https://tools.ietf.org/html/rfc5766#section-10.3
325    //  When the server receives a UDP datagram at a currently allocated
326    //  relayed transport address, the server looks up the allocation
327    //  associated with the relayed transport address.  The server then
328    //  checks to see whether the set of permissions for the allocation allow
329    //  the relaying of the UDP datagram as described in Section 8.
330    //
331    //  If relaying is permitted, then the server checks if there is a
332    //  channel bound to the peer that sent the UDP datagram (see
333    //  Section 11).  If a channel is bound, then processing proceeds as
334    //  described in Section 11.7.
335    //
336    //  If relaying is permitted but no channel is bound to the peer, then
337    //  the server forms and sends a Data indication.  The Data indication
338    //  MUST contain both an XOR-PEER-ADDRESS and a DATA attribute.  The DATA
339    //  attribute is set to the value of the 'data octets' field from the
340    //  datagram, and the XOR-PEER-ADDRESS attribute is set to the source
341    //  transport address of the received UDP datagram.  The Data indication
342    //  is then sent on the 5-tuple associated with the allocation.
343    async fn packet_handler(&mut self) {
344        let five_tuple = self.five_tuple;
345        let relay_addr = self.relay_addr;
346        let relay_socket = Arc::clone(&self.relay_socket);
347        let turn_socket = Arc::clone(&self.turn_socket);
348        let allocations = self.allocations.clone();
349        let channel_bindings = Arc::clone(&self.channel_bindings);
350        let permissions = Arc::clone(&self.permissions);
351        let (drop_tx, drop_rx) = oneshot::channel::<u32>();
352        self.drop_tx = Some(drop_tx);
353
354        tokio::spawn(async move {
355            let mut buffer = vec![0u8; RTP_MTU];
356
357            tokio::pin!(drop_rx);
358
359            loop {
360                let (n, src_addr) = tokio::select! {
361                    result = relay_socket.recv_from(&mut buffer) => {
362                        match result {
363                            Ok((n, src_addr)) => (n, src_addr),
364                            Err(_) => {
365                                if let Some(allocs) = &allocations.upgrade() {
366                                    let mut allocs = allocs.lock().await;
367                                    allocs.remove(&five_tuple);
368                                }
369                                break;
370                            }
371                        }
372                    }
373                    _ = drop_rx.as_mut() => {
374                        log::trace!("allocation has stopped, stop packet_handler. five_tuple: {:?}", five_tuple);
375                        break;
376                    }
377                };
378
379                log::debug!(
380                    "relay socket {:?} received {} bytes from {}",
381                    relay_socket.local_addr(),
382                    n,
383                    src_addr
384                );
385
386                let cb_number = {
387                    let mut cb_number = None;
388                    let cbs = channel_bindings.lock().await;
389                    for cb in cbs.values() {
390                        if cb.peer == src_addr {
391                            cb_number = Some(cb.number);
392                            break;
393                        }
394                    }
395                    cb_number
396                };
397
398                if let Some(number) = cb_number {
399                    let mut channel_data = ChannelData {
400                        data: buffer[..n].to_vec(),
401                        number,
402                        raw: vec![],
403                    };
404                    channel_data.encode();
405
406                    if let Err(err) = turn_socket
407                        .send_to(&channel_data.raw, five_tuple.src_addr)
408                        .await
409                    {
410                        log::error!(
411                            "Failed to send ChannelData from allocation {} {}",
412                            src_addr,
413                            err
414                        );
415                    }
416                } else {
417                    let exist = {
418                        let ps = permissions.lock().await;
419                        ps.get(&addr2ipfingerprint(&src_addr)).is_some()
420                    };
421
422                    if exist {
423                        let msg = {
424                            let peer_address_attr = PeerAddress {
425                                ip: src_addr.ip(),
426                                port: src_addr.port(),
427                            };
428                            let data_attr = Data(buffer[..n].to_vec());
429
430                            let mut msg = Message::new();
431                            if let Err(err) = msg.build(&[
432                                Box::new(TransactionId::new()),
433                                Box::new(MessageType::new(METHOD_DATA, CLASS_INDICATION)),
434                                Box::new(peer_address_attr),
435                                Box::new(data_attr),
436                            ]) {
437                                log::error!(
438                                    "Failed to send DataIndication from allocation {} {}",
439                                    src_addr,
440                                    err
441                                );
442                                None
443                            } else {
444                                Some(msg)
445                            }
446                        };
447
448                        if let Some(msg) = msg {
449                            log::debug!(
450                                "relaying message from {} to client at {}",
451                                src_addr,
452                                five_tuple.src_addr
453                            );
454                            if let Err(err) =
455                                turn_socket.send_to(&msg.raw, five_tuple.src_addr).await
456                            {
457                                log::error!(
458                                    "Failed to send DataIndication from allocation {} {}",
459                                    src_addr,
460                                    err
461                                );
462                            }
463                        }
464                    } else {
465                        log::info!(
466                            "No Permission or Channel exists for {} on allocation {}",
467                            src_addr,
468                            relay_addr
469                        );
470                    }
471                }
472            }
473        });
474    }
475}