webrtc_turn/client/
mod.rs

1#[cfg(test)]
2mod client_test;
3
4pub mod binding;
5pub mod periodic_timer;
6pub mod permission;
7pub mod relay_conn;
8pub mod transaction;
9
10use crate::errors::*;
11use crate::proto::{
12    chandata::*, data::*, lifetime::*, peeraddr::*, relayaddr::*, reqtrans::*, PROTO_UDP,
13};
14use binding::*;
15use relay_conn::*;
16use transaction::*;
17
18use stun::agent::*;
19use stun::attributes::*;
20use stun::error_code::*;
21use stun::fingerprint::*;
22use stun::integrity::*;
23use stun::message::*;
24use stun::textattrs::*;
25use stun::xoraddr::*;
26
27use std::sync::Arc;
28
29use std::net::SocketAddr;
30use std::str::FromStr;
31use tokio::sync::{mpsc, Mutex};
32use util::{conn::*, vnet::net::*, Error};
33
34use async_trait::async_trait;
35
36const DEFAULT_RTO_IN_MS: u16 = 200;
37const MAX_DATA_BUFFER_SIZE: usize = u16::MAX as usize; // message size limit for Chromium
38const MAX_READ_QUEUE_SIZE: usize = 1024;
39
40//              interval [msec]
41// 0: 0 ms      +500
42// 1: 500 ms	+1000
43// 2: 1500 ms   +2000
44// 3: 3500 ms   +4000
45// 4: 7500 ms   +8000
46// 5: 15500 ms  +16000
47// 6: 31500 ms  +32000
48// -: 63500 ms  failed
49
50// ClientConfig is a bag of config parameters for Client.
51pub struct ClientConfig {
52    pub stun_serv_addr: String, // STUN server address (e.g. "stun.abc.com:3478")
53    pub turn_serv_addr: String, // TURN server addrees (e.g. "turn.abc.com:3478")
54    pub username: String,
55    pub password: String,
56    pub realm: String,
57    pub software: String,
58    pub rto_in_ms: u16,
59    pub conn: Arc<dyn Conn + Send + Sync>,
60    pub vnet: Option<Arc<Net>>,
61}
62
63struct ClientInternal {
64    conn: Arc<dyn Conn + Send + Sync>,
65    stun_serv_addr: String,
66    turn_serv_addr: String,
67    username: Username,
68    password: String,
69    realm: Realm,
70    integrity: MessageIntegrity,
71    software: Software,
72    tr_map: Arc<Mutex<TransactionMap>>,
73    binding_mgr: Arc<Mutex<BindingManager>>,
74    rto_in_ms: u16,
75    read_ch_tx: Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
76}
77
78#[async_trait]
79impl RelayConnObserver for ClientInternal {
80    // turn_server_addr return the TURN server address
81    fn turn_server_addr(&self) -> String {
82        self.turn_serv_addr.clone()
83    }
84
85    // username returns username
86    fn username(&self) -> Username {
87        self.username.clone()
88    }
89
90    // realm return realm
91    fn realm(&self) -> Realm {
92        self.realm.clone()
93    }
94
95    // WriteTo sends data to the specified destination using the base socket.
96    async fn write_to(&self, data: &[u8], to: &str) -> Result<usize, Error> {
97        let n = self.conn.send_to(data, SocketAddr::from_str(to)?).await?;
98        Ok(n)
99    }
100
101    // PerformTransaction performs STUN transaction
102    async fn perform_transaction(
103        &mut self,
104        msg: &Message,
105        to: &str,
106        ignore_result: bool,
107    ) -> Result<TransactionResult, Error> {
108        let tr_key = base64::encode(&msg.transaction_id.0);
109
110        let mut tr = Transaction::new(TransactionConfig {
111            key: tr_key.clone(),
112            raw: msg.raw.clone(),
113            to: to.to_string(),
114            interval: self.rto_in_ms,
115            ignore_result,
116        });
117        let result_ch_rx = tr.get_result_channel();
118
119        log::trace!("start {} transaction {} to {}", msg.typ, tr_key, tr.to);
120        {
121            let mut tm = self.tr_map.lock().await;
122            tm.insert(tr_key.clone(), tr);
123        }
124
125        self.conn
126            .send_to(&msg.raw, SocketAddr::from_str(to)?)
127            .await?;
128
129        let conn2 = Arc::clone(&self.conn);
130        let tr_map2 = Arc::clone(&self.tr_map);
131        {
132            let mut tm = self.tr_map.lock().await;
133            if let Some(tr) = tm.get(&tr_key) {
134                tr.start_rtx_timer(conn2, tr_map2).await;
135            }
136        }
137
138        // If dontWait is true, get the transaction going and return immediately
139        if ignore_result {
140            return Ok(TransactionResult::default());
141        }
142
143        // wait_for_result waits for the transaction result
144        if let Some(mut result_ch_rx) = result_ch_rx {
145            match result_ch_rx.recv().await {
146                Some(tr) => Ok(tr),
147                None => Err(ERR_TRANSACTION_CLOSED.to_owned()),
148            }
149        } else {
150            Err(ERR_WAIT_FOR_RESULT_ON_NON_RESULT_TRANSACTION.to_owned())
151        }
152    }
153}
154
155impl ClientInternal {
156    // new returns a new Client instance. listeningAddress is the address and port to listen on, default "0.0.0.0:0"
157    async fn new(config: ClientConfig) -> Result<Self, Error> {
158        let net = if let Some(vnet) = config.vnet {
159            if vnet.is_virtual() {
160                log::warn!("vnet is enabled");
161            }
162            vnet
163        } else {
164            Arc::new(Net::new(None))
165        };
166
167        let stun_serv_addr = if config.stun_serv_addr.is_empty() {
168            String::new()
169        } else {
170            log::debug!("resolving {}", config.stun_serv_addr);
171            let local_addr = config.conn.local_addr().await?;
172            let stun_serv = net
173                .resolve_addr(local_addr.is_ipv4(), &config.stun_serv_addr)
174                .await?;
175            log::debug!("stunServ: {}", stun_serv);
176            stun_serv.to_string()
177        };
178
179        let turn_serv_addr = if config.turn_serv_addr.is_empty() {
180            String::new()
181        } else {
182            log::debug!("resolving {}", config.turn_serv_addr);
183            let local_addr = config.conn.local_addr().await?;
184            let turn_serv = net
185                .resolve_addr(local_addr.is_ipv4(), &config.turn_serv_addr)
186                .await?;
187            log::debug!("turnServ: {}", turn_serv);
188            turn_serv.to_string()
189        };
190
191        Ok(ClientInternal {
192            conn: Arc::clone(&config.conn),
193            stun_serv_addr,
194            turn_serv_addr,
195            username: Username::new(ATTR_USERNAME, config.username),
196            password: config.password,
197            realm: Realm::new(ATTR_REALM, config.realm),
198            software: Software::new(ATTR_SOFTWARE, config.software),
199            tr_map: Arc::new(Mutex::new(TransactionMap::new())),
200            binding_mgr: Arc::new(Mutex::new(BindingManager::new())),
201            rto_in_ms: if config.rto_in_ms != 0 {
202                config.rto_in_ms
203            } else {
204                DEFAULT_RTO_IN_MS
205            },
206            integrity: MessageIntegrity::new_short_term_integrity(String::new()),
207            read_ch_tx: Arc::new(Mutex::new(None)),
208        })
209    }
210
211    // stun_server_addr return the STUN server address
212    fn stun_server_addr(&self) -> String {
213        self.stun_serv_addr.clone()
214    }
215
216    // Listen will have this client start listening on the relay_conn provided via the config.
217    // This is optional. If not used, you will need to call handle_inbound method
218    // to supply incoming data, instead.
219    async fn listen(&self) -> Result<(), Error> {
220        let conn = Arc::clone(&self.conn);
221        let stun_serv_str = self.stun_serv_addr.clone();
222        let tr_map = Arc::clone(&self.tr_map);
223        let read_ch_tx = Arc::clone(&self.read_ch_tx);
224        let binding_mgr = Arc::clone(&self.binding_mgr);
225
226        tokio::spawn(async move {
227            let mut buf = vec![0u8; MAX_DATA_BUFFER_SIZE];
228            loop {
229                //TODO: gracefully exit loop
230                let (n, from) = match conn.recv_from(&mut buf).await {
231                    Ok((n, from)) => (n, from),
232                    Err(err) => {
233                        log::debug!("exiting read loop: {}", err);
234                        break;
235                    }
236                };
237
238                log::debug!("received {} bytes of udp from {}", n, from);
239
240                if let Err(err) = ClientInternal::handle_inbound(
241                    &read_ch_tx,
242                    &buf[..n],
243                    from,
244                    &stun_serv_str,
245                    &tr_map,
246                    &binding_mgr,
247                )
248                .await
249                {
250                    log::debug!("exiting read loop: {}", err);
251                    break;
252                }
253            }
254        });
255
256        Ok(())
257    }
258
259    // handle_inbound handles data received.
260    // This method handles incoming packet demultiplex it by the source address
261    // and the types of the message.
262    // This return a booleen (handled or not) and if there was an error.
263    // Caller should check if the packet was handled by this client or not.
264    // If not handled, it is assumed that the packet is application data.
265    // If an error is returned, the caller should discard the packet regardless.
266    async fn handle_inbound(
267        read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
268        data: &[u8],
269        from: SocketAddr,
270        stun_serv_str: &str,
271        tr_map: &Arc<Mutex<TransactionMap>>,
272        binding_mgr: &Arc<Mutex<BindingManager>>,
273    ) -> Result<(), Error> {
274        // +-------------------+-------------------------------+
275        // |   Return Values   |                               |
276        // +-------------------+       Meaning / Action        |
277        // | handled |  error  |                               |
278        // |=========+=========+===============================+
279        // |  false  |   nil   | Handle the packet as app data |
280        // |---------+---------+-------------------------------+
281        // |  true   |   nil   |        Nothing to do          |
282        // |---------+---------+-------------------------------+
283        // |  false  |  error  |     (shouldn't happen)        |
284        // |---------+---------+-------------------------------+
285        // |  true   |  error  | Error occurred while handling |
286        // +---------+---------+-------------------------------+
287        // Possible causes of the error:
288        //  - Malformed packet (parse error)
289        //  - STUN message was a request
290        //  - Non-STUN message from the STUN server
291
292        if is_message(data) {
293            ClientInternal::handle_stun_message(tr_map, read_ch_tx, data, from).await
294        } else if ChannelData::is_channel_data(data) {
295            ClientInternal::handle_channel_data(binding_mgr, read_ch_tx, data).await
296        } else if !stun_serv_str.is_empty() && from.to_string() == *stun_serv_str {
297            // received from STUN server but it is not a STUN message
298            Err(ERR_NON_STUNMESSAGE.to_owned())
299        } else {
300            // assume, this is an application data
301            log::trace!("non-STUN/TURN packect, unhandled");
302            Ok(())
303        }
304    }
305
306    async fn handle_stun_message(
307        tr_map: &Arc<Mutex<TransactionMap>>,
308        read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
309        data: &[u8],
310        mut from: SocketAddr,
311    ) -> Result<(), Error> {
312        let mut msg = Message::new();
313        msg.raw = data.to_vec();
314        msg.decode()?;
315
316        if msg.typ.class == CLASS_REQUEST {
317            return Err(Error::new(format!(
318                "{} : {}",
319                *ERR_UNEXPECTED_STUNREQUEST_MESSAGE, msg
320            )));
321        }
322
323        if msg.typ.class == CLASS_INDICATION {
324            if msg.typ.method == METHOD_DATA {
325                let mut peer_addr = PeerAddress::default();
326                peer_addr.get_from(&msg)?;
327                from = SocketAddr::new(peer_addr.ip, peer_addr.port);
328
329                let mut data = Data::default();
330                data.get_from(&msg)?;
331
332                log::debug!("data indication received from {}", from);
333
334                let _ = ClientInternal::handle_inbound_relay_conn(read_ch_tx, &data.0, from).await;
335            }
336
337            return Ok(());
338        }
339
340        // This is a STUN response message (transactional)
341        // The type is either:
342        // - stun.ClassSuccessResponse
343        // - stun.ClassErrorResponse
344
345        let tr_key = base64::encode(&msg.transaction_id.0);
346
347        let mut tm = tr_map.lock().await;
348        if tm.find(&tr_key).is_none() {
349            // silently discard
350            log::debug!("no transaction for {}", msg);
351            return Ok(());
352        }
353
354        if let Some(mut tr) = tm.delete(&tr_key) {
355            // End the transaction
356            tr.stop_rtx_timer();
357
358            if !tr
359                .write_result(TransactionResult {
360                    msg,
361                    from,
362                    retries: tr.retries(),
363                    ..Default::default()
364                })
365                .await
366            {
367                log::debug!("no listener for msg.raw {:?}", data);
368            }
369        }
370
371        Ok(())
372    }
373
374    async fn handle_channel_data(
375        binding_mgr: &Arc<Mutex<BindingManager>>,
376        read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
377        data: &[u8],
378    ) -> Result<(), Error> {
379        let mut ch_data = ChannelData {
380            raw: data.to_vec(),
381            ..Default::default()
382        };
383        ch_data.decode()?;
384
385        let addr = ClientInternal::find_addr_by_channel_number(binding_mgr, ch_data.number.0)
386            .await
387            .ok_or_else(|| ERR_CHANNEL_BIND_NOT_FOUND.to_owned())?;
388
389        log::trace!(
390            "channel data received from {} (ch={})",
391            addr,
392            ch_data.number.0
393        );
394
395        let _ = ClientInternal::handle_inbound_relay_conn(read_ch_tx, &ch_data.data, addr).await;
396
397        Ok(())
398    }
399
400    // handle_inbound_relay_conn passes inbound data in RelayConn
401    async fn handle_inbound_relay_conn(
402        read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
403        data: &[u8],
404        from: SocketAddr,
405    ) -> Result<(), Error> {
406        let read_ch_tx_opt = read_ch_tx.lock().await;
407        log::debug!("read_ch_tx_opt = {}", read_ch_tx_opt.is_some());
408        if let Some(tx) = &*read_ch_tx_opt {
409            log::debug!("try_send data = {:?}, from = {}", data, from);
410            if tx
411                .try_send(InboundData {
412                    data: data.to_vec(),
413                    from,
414                })
415                .is_err()
416            {
417                log::warn!("receive buffer full");
418            }
419            Ok(())
420        } else {
421            Err(ERR_ALREADY_CLOSED.to_owned())
422        }
423    }
424
425    // Close closes this client
426    async fn close(&mut self) {
427        {
428            let mut read_ch_tx = self.read_ch_tx.lock().await;
429            read_ch_tx.take();
430        }
431        {
432            let mut tm = self.tr_map.lock().await;
433            tm.close_and_delete_all();
434        }
435    }
436
437    // send_binding_request_to sends a new STUN request to the given transport address
438    async fn send_binding_request_to(&mut self, to: &str) -> Result<SocketAddr, Error> {
439        let msg = {
440            let attrs: Vec<Box<dyn Setter>> = if !self.software.text.is_empty() {
441                vec![
442                    Box::new(TransactionId::new()),
443                    Box::new(BINDING_REQUEST),
444                    Box::new(self.software.clone()),
445                ]
446            } else {
447                vec![Box::new(TransactionId::new()), Box::new(BINDING_REQUEST)]
448            };
449
450            let mut msg = Message::new();
451            msg.build(&attrs)?;
452            msg
453        };
454
455        log::debug!("client.SendBindingRequestTo call PerformTransaction 1");
456        let tr_res = self.perform_transaction(&msg, to, false).await?;
457
458        let mut refl_addr = XORMappedAddress::default();
459        refl_addr.get_from(&tr_res.msg)?;
460
461        Ok(SocketAddr::new(refl_addr.ip, refl_addr.port))
462    }
463
464    // send_binding_request sends a new STUN request to the STUN server
465    async fn send_binding_request(&mut self) -> Result<SocketAddr, Error> {
466        if self.stun_serv_addr.is_empty() {
467            Err(ERR_STUNSERVER_ADDRESS_NOT_SET.to_owned())
468        } else {
469            self.send_binding_request_to(&self.stun_serv_addr.clone())
470                .await
471        }
472    }
473
474    // find_addr_by_channel_number returns a peer address associated with the
475    // channel number on this UDPConn
476    async fn find_addr_by_channel_number(
477        binding_mgr: &Arc<Mutex<BindingManager>>,
478        ch_num: u16,
479    ) -> Option<SocketAddr> {
480        let bm = binding_mgr.lock().await;
481        if let Some(b) = bm.find_by_number(ch_num) {
482            Some(b.addr)
483        } else {
484            None
485        }
486    }
487
488    // Allocate sends a TURN allocation request to the given transport address
489    async fn allocate(&mut self) -> Result<RelayConnConfig, Error> {
490        {
491            let read_ch_tx = self.read_ch_tx.lock().await;
492            log::debug!("allocate check: read_ch_tx_opt = {}", read_ch_tx.is_some());
493            if read_ch_tx.is_some() {
494                return Err(ERR_ONE_ALLOCATE_ONLY.to_owned());
495            }
496        }
497
498        let mut msg = Message::new();
499        msg.build(&[
500            Box::new(TransactionId::new()),
501            Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
502            Box::new(RequestedTransport {
503                protocol: PROTO_UDP,
504            }),
505            Box::new(FINGERPRINT),
506        ])?;
507
508        log::debug!("client.Allocate call PerformTransaction 1");
509        let tr_res = self
510            .perform_transaction(&msg, &self.turn_serv_addr.clone(), false)
511            .await?;
512        let res = tr_res.msg;
513
514        // Anonymous allocate failed, trying to authenticate.
515        let nonce = Nonce::get_from_as(&res, ATTR_NONCE)?;
516        self.realm = Realm::get_from_as(&res, ATTR_REALM)?;
517
518        self.integrity = MessageIntegrity::new_long_term_integrity(
519            self.username.text.clone(),
520            self.realm.text.clone(),
521            self.password.clone(),
522        );
523
524        // Trying to authorize.
525        msg.build(&[
526            Box::new(TransactionId::new()),
527            Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
528            Box::new(RequestedTransport {
529                protocol: PROTO_UDP,
530            }),
531            Box::new(self.username.clone()),
532            Box::new(self.realm.clone()),
533            Box::new(nonce.clone()),
534            Box::new(self.integrity.clone()),
535            Box::new(FINGERPRINT),
536        ])?;
537
538        log::debug!("client.Allocate call PerformTransaction 2");
539        let tr_res = self
540            .perform_transaction(&msg, &self.turn_serv_addr.clone(), false)
541            .await?;
542        let res = tr_res.msg;
543
544        if res.typ.class == CLASS_ERROR_RESPONSE {
545            let mut code = ErrorCodeAttribute::default();
546            let result = code.get_from(&res);
547            if result.is_err() {
548                return Err(Error::new(format!("{}", res.typ)));
549            } else {
550                return Err(Error::new(format!("{} (error {})", res.typ, code)));
551            }
552        }
553
554        // Getting relayed addresses from response.
555        let mut relayed = RelayedAddress::default();
556        relayed.get_from(&res)?;
557        let relayed_addr = SocketAddr::new(relayed.ip, relayed.port);
558
559        // Getting lifetime from response
560        let mut lifetime = Lifetime::default();
561        lifetime.get_from(&res)?;
562
563        let (read_ch_tx, read_ch_rx) = mpsc::channel(MAX_READ_QUEUE_SIZE);
564        {
565            let mut read_ch_tx_opt = self.read_ch_tx.lock().await;
566            *read_ch_tx_opt = Some(read_ch_tx);
567            log::debug!("allocate: read_ch_tx_opt = {}", read_ch_tx_opt.is_some());
568        }
569
570        Ok(RelayConnConfig {
571            relayed_addr,
572            integrity: self.integrity.clone(),
573            nonce,
574            lifetime: lifetime.0,
575            binding_mgr: Arc::clone(&self.binding_mgr),
576            read_ch_rx: Arc::new(Mutex::new(read_ch_rx)),
577        })
578    }
579}
580
581// Client is a STUN server client
582#[derive(Clone)]
583pub struct Client {
584    client_internal: Arc<Mutex<ClientInternal>>,
585}
586
587impl Client {
588    pub async fn new(config: ClientConfig) -> Result<Self, Error> {
589        let ci = ClientInternal::new(config).await?;
590        Ok(Client {
591            client_internal: Arc::new(Mutex::new(ci)),
592        })
593    }
594
595    pub async fn listen(&self) -> Result<(), Error> {
596        let ci = self.client_internal.lock().await;
597        ci.listen().await
598    }
599
600    pub async fn allocate(&self) -> Result<impl Conn, Error> {
601        let config = {
602            let mut ci = self.client_internal.lock().await;
603            ci.allocate().await?
604        };
605
606        Ok(RelayConn::new(Arc::clone(&self.client_internal), config))
607    }
608
609    pub async fn close(&self) -> Result<(), Error> {
610        let mut ci = self.client_internal.lock().await;
611        ci.close().await;
612        Ok(())
613    }
614
615    // send_binding_request_to sends a new STUN request to the given transport address
616    pub async fn send_binding_request_to(&self, to: &str) -> Result<SocketAddr, Error> {
617        let mut ci = self.client_internal.lock().await;
618        ci.send_binding_request_to(to).await
619    }
620
621    // send_binding_request sends a new STUN request to the STUN server
622    pub async fn send_binding_request(&self) -> Result<SocketAddr, Error> {
623        let mut ci = self.client_internal.lock().await;
624        ci.send_binding_request().await
625    }
626}