Skip to main content

clia_turn/client/
mod.rs

1#[cfg(test)]
2mod client_test;
3
4pub mod binding;
5pub mod periodic_timer;
6pub mod permission;
7pub mod relay_conn;
8pub mod transaction;
9
10use std::net::SocketAddr;
11use std::str::FromStr;
12use std::sync::Arc;
13
14use async_trait::async_trait;
15use base64::prelude::BASE64_STANDARD;
16use base64::Engine;
17use binding::*;
18use relay_conn::*;
19use stun::agent::*;
20use stun::attributes::*;
21use stun::error_code::*;
22use stun::fingerprint::*;
23use stun::integrity::*;
24use stun::message::*;
25use stun::textattrs::*;
26use stun::xoraddr::*;
27use tokio::pin;
28use tokio::select;
29use tokio::sync::{mpsc, Mutex};
30use tokio_util::sync::CancellationToken;
31use transaction::*;
32use util::conn::*;
33use util::vnet::net::*;
34
35use crate::error::*;
36use crate::proto::chandata::*;
37use crate::proto::data::*;
38use crate::proto::lifetime::*;
39use crate::proto::peeraddr::*;
40use crate::proto::relayaddr::*;
41use crate::proto::reqtrans::*;
42use crate::proto::PROTO_UDP;
43
44const DEFAULT_RTO_IN_MS: u16 = 200;
45const MAX_DATA_BUFFER_SIZE: usize = u16::MAX as usize; // message size limit for Chromium
46const MAX_READ_QUEUE_SIZE: usize = 1024;
47
48//              interval [msec]
49// 0: 0 ms      +500
50// 1: 500 ms	+1000
51// 2: 1500 ms   +2000
52// 3: 3500 ms   +4000
53// 4: 7500 ms   +8000
54// 5: 15500 ms  +16000
55// 6: 31500 ms  +32000
56// -: 63500 ms  failed
57
58/// ClientConfig is a bag of config parameters for Client.
59pub struct ClientConfig {
60    pub stun_serv_addr: String, // STUN server address (e.g. "stun.abc.com:3478")
61    pub turn_serv_addr: String, // TURN server address (e.g. "turn.abc.com:3478")
62    pub username: String,
63    pub password: String,
64    pub realm: String,
65    pub software: String,
66    pub rto_in_ms: u16,
67    pub conn: Arc<dyn Conn + Send + Sync>,
68    pub vnet: Option<Arc<Net>>,
69}
70
71struct ClientInternal {
72    conn: Arc<dyn Conn + Send + Sync>,
73    stun_serv_addr: String,
74    turn_serv_addr: String,
75    username: Username,
76    password: String,
77    realm: Realm,
78    integrity: MessageIntegrity,
79    software: Software,
80    tr_map: Arc<Mutex<TransactionMap>>,
81    binding_mgr: Arc<Mutex<BindingManager>>,
82    rto_in_ms: u16,
83    read_ch_tx: Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
84    close_notify: CancellationToken,
85}
86
87#[async_trait]
88impl RelayConnObserver for ClientInternal {
89    /// Returns the TURN server address.
90    fn turn_server_addr(&self) -> String {
91        self.turn_serv_addr.clone()
92    }
93
94    /// Returns the `username`.
95    fn username(&self) -> Username {
96        self.username.clone()
97    }
98
99    /// Return the `realm`.
100    fn realm(&self) -> Realm {
101        self.realm.clone()
102    }
103
104    /// Sends data to the specified destination using the base socket.
105    async fn write_to(&self, data: &[u8], to: &str) -> std::result::Result<usize, util::Error> {
106        let n = self.conn.send_to(data, SocketAddr::from_str(to)?).await?;
107        Ok(n)
108    }
109
110    /// Performs STUN transaction.
111    async fn perform_transaction(
112        &mut self,
113        msg: &Message,
114        to: &str,
115        ignore_result: bool,
116    ) -> Result<TransactionResult> {
117        let tr_key = BASE64_STANDARD.encode(msg.transaction_id.0);
118
119        let mut tr = Transaction::new(TransactionConfig {
120            key: tr_key.clone(),
121            raw: msg.raw.clone(),
122            to: to.to_string(),
123            interval: self.rto_in_ms,
124            ignore_result,
125        });
126        let result_ch_rx = tr.get_result_channel();
127
128        log::trace!("start {} transaction {} to {}", msg.typ, tr_key, tr.to);
129        {
130            let mut tm = self.tr_map.lock().await;
131            tm.insert(tr_key.clone(), tr);
132        }
133
134        self.conn
135            .send_to(&msg.raw, SocketAddr::from_str(to)?)
136            .await?;
137
138        let conn2 = Arc::clone(&self.conn);
139        let tr_map2 = Arc::clone(&self.tr_map);
140        {
141            let mut tm = self.tr_map.lock().await;
142            if let Some(tr) = tm.get(&tr_key) {
143                tr.start_rtx_timer(conn2, tr_map2).await;
144            }
145        }
146
147        // If dontWait is true, get the transaction going and return immediately
148        if ignore_result {
149            return Ok(TransactionResult::default());
150        }
151
152        // wait_for_result waits for the transaction result
153        if let Some(mut result_ch_rx) = result_ch_rx {
154            match result_ch_rx.recv().await {
155                Some(tr) => Ok(tr),
156                None => Err(Error::ErrTransactionClosed),
157            }
158        } else {
159            Err(Error::ErrWaitForResultOnNonResultTransaction)
160        }
161    }
162}
163
164impl ClientInternal {
165    /// Creates a new [`ClientInternal`].
166    async fn new(config: ClientConfig) -> Result<Self> {
167        let net = if let Some(vnet) = config.vnet {
168            if vnet.is_virtual() {
169                log::warn!("vnet is enabled");
170            }
171            vnet
172        } else {
173            Arc::new(Net::new(None))
174        };
175
176        let stun_serv_addr = if config.stun_serv_addr.is_empty() {
177            String::new()
178        } else {
179            log::debug!("resolving {}", config.stun_serv_addr);
180            let local_addr = config.conn.local_addr()?;
181            let stun_serv = net
182                .resolve_addr(local_addr.is_ipv4(), &config.stun_serv_addr)
183                .await?;
184            log::debug!("stunServ: {}", stun_serv);
185            stun_serv.to_string()
186        };
187
188        let turn_serv_addr = if config.turn_serv_addr.is_empty() {
189            String::new()
190        } else {
191            log::debug!("resolving {}", config.turn_serv_addr);
192            let local_addr = config.conn.local_addr()?;
193            let turn_serv = net
194                .resolve_addr(local_addr.is_ipv4(), &config.turn_serv_addr)
195                .await?;
196            log::debug!("turnServ: {}", turn_serv);
197            turn_serv.to_string()
198        };
199
200        Ok(ClientInternal {
201            conn: Arc::clone(&config.conn),
202            stun_serv_addr,
203            turn_serv_addr,
204            username: Username::new(ATTR_USERNAME, config.username),
205            password: config.password,
206            realm: Realm::new(ATTR_REALM, config.realm),
207            software: Software::new(ATTR_SOFTWARE, config.software),
208            tr_map: Arc::new(Mutex::new(TransactionMap::new())),
209            binding_mgr: Arc::new(Mutex::new(BindingManager::new())),
210            rto_in_ms: if config.rto_in_ms != 0 {
211                config.rto_in_ms
212            } else {
213                DEFAULT_RTO_IN_MS
214            },
215            integrity: MessageIntegrity::new_short_term_integrity(String::new()),
216            read_ch_tx: Arc::new(Mutex::new(None)),
217            close_notify: CancellationToken::new(),
218        })
219    }
220
221    /// Returns the STUN server address.
222    fn stun_server_addr(&self) -> String {
223        self.stun_serv_addr.clone()
224    }
225
226    /// `listen()` will have this client start listening on the `relay_conn` provided via the config.
227    /// This is optional. If not used, you will need to call `handle_inbound` method
228    /// to supply incoming data, instead.
229    async fn listen(&self) -> Result<()> {
230        let conn = Arc::clone(&self.conn);
231        let stun_serv_str = self.stun_serv_addr.clone();
232        let tr_map = Arc::clone(&self.tr_map);
233        let read_ch_tx = Arc::clone(&self.read_ch_tx);
234        let binding_mgr = Arc::clone(&self.binding_mgr);
235        let close_notify = self.close_notify.clone();
236
237        tokio::spawn(async move {
238            let mut buf = vec![0u8; MAX_DATA_BUFFER_SIZE];
239            let wait_cancel = close_notify.cancelled();
240            pin!(wait_cancel);
241
242            loop {
243                let (n, from) = select! {
244                    biased;
245
246                    _ = &mut wait_cancel => {
247                        log::debug!("exiting read loop");
248                        break;
249                    },
250                    result = conn.recv_from(&mut buf) => match result {
251                        Ok((n, from)) => (n, from),
252                        Err(err) => {
253                            log::debug!("exiting read loop: {}", err);
254                            break;
255                        }
256                    }
257                };
258                log::debug!("received {} bytes of udp from {}", n, from);
259
260                select! {
261                    biased;
262
263                    _ = &mut wait_cancel => {
264                        log::debug!("exiting read loop");
265                        break;
266                    },
267                    result = ClientInternal::handle_inbound(
268                        &read_ch_tx,
269                        &buf[..n],
270                        from,
271                        &stun_serv_str,
272                        &tr_map,
273                        &binding_mgr,
274                    ) => {
275                        if let Err(err) = result {
276                            log::debug!("exiting read loop: {}", err);
277                            break;
278                        }
279                    }
280                }
281            }
282        });
283
284        Ok(())
285    }
286
287    /// Handles data received.
288    ///
289    /// This method handles incoming packet demultiplex it by the source address
290    /// and the types of the message.
291    /// Caller should check if the packet was handled by this client or not.
292    /// If not handled, it is assumed that the packet is application data.
293    /// If an error is returned, the caller should discard the packet regardless.
294    async fn handle_inbound(
295        read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
296        data: &[u8],
297        from: SocketAddr,
298        stun_serv_str: &str,
299        tr_map: &Arc<Mutex<TransactionMap>>,
300        binding_mgr: &Arc<Mutex<BindingManager>>,
301    ) -> Result<()> {
302        // +-------------------+-------------------------------+
303        // |   Return Values   |                               |
304        // +-------------------+       Meaning / Action        |
305        // | handled |  error  |                               |
306        // |=========+=========+===============================+
307        // |  false  |   nil   | Handle the packet as app data |
308        // |---------+---------+-------------------------------+
309        // |  true   |   nil   |        Nothing to do          |
310        // |---------+---------+-------------------------------+
311        // |  false  |  error  |     (shouldn't happen)        |
312        // |---------+---------+-------------------------------+
313        // |  true   |  error  | Error occurred while handling |
314        // +---------+---------+-------------------------------+
315        // Possible causes of the error:
316        //  - Malformed packet (parse error)
317        //  - STUN message was a request
318        //  - Non-STUN message from the STUN server
319
320        if is_message(data) {
321            ClientInternal::handle_stun_message(tr_map, read_ch_tx, data, from).await
322        } else if ChannelData::is_channel_data(data) {
323            ClientInternal::handle_channel_data(binding_mgr, read_ch_tx, data).await
324        } else if !stun_serv_str.is_empty() && from.to_string() == *stun_serv_str {
325            // received from STUN server but it is not a STUN message
326            Err(Error::ErrNonStunmessage)
327        } else {
328            // assume, this is an application data
329            log::trace!("non-STUN/TURN packect, unhandled");
330            Ok(())
331        }
332    }
333
334    async fn handle_stun_message(
335        tr_map: &Arc<Mutex<TransactionMap>>,
336        read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
337        data: &[u8],
338        mut from: SocketAddr,
339    ) -> Result<()> {
340        let mut msg = Message::new();
341        msg.raw = data.to_vec();
342        msg.decode()?;
343
344        if msg.typ.class == CLASS_REQUEST {
345            return Err(Error::Other(format!(
346                "{:?} : {}",
347                Error::ErrUnexpectedStunrequestMessage,
348                msg
349            )));
350        }
351
352        if msg.typ.class == CLASS_INDICATION {
353            if msg.typ.method == METHOD_DATA {
354                let mut peer_addr = PeerAddress::default();
355                peer_addr.get_from(&msg)?;
356                from = SocketAddr::new(peer_addr.ip, peer_addr.port);
357
358                let mut data = Data::default();
359                data.get_from(&msg)?;
360
361                log::debug!("data indication received from {}", from);
362
363                let _ = ClientInternal::handle_inbound_relay_conn(read_ch_tx, &data.0, from).await;
364            }
365
366            return Ok(());
367        }
368
369        // This is a STUN response message (transactional)
370        // The type is either:
371        // - stun.ClassSuccessResponse
372        // - stun.ClassErrorResponse
373
374        let tr_key = BASE64_STANDARD.encode(msg.transaction_id.0);
375
376        let mut tm = tr_map.lock().await;
377        if tm.find(&tr_key).is_none() {
378            // silently discard
379            log::debug!("no transaction for {}", msg);
380            return Ok(());
381        }
382
383        if let Some(mut tr) = tm.delete(&tr_key) {
384            // End the transaction
385            tr.stop_rtx_timer();
386
387            if !tr
388                .write_result(TransactionResult {
389                    msg,
390                    from,
391                    retries: tr.retries(),
392                    ..Default::default()
393                })
394                .await
395            {
396                log::debug!("no listener for msg.raw {:?}", data);
397            }
398        }
399
400        Ok(())
401    }
402
403    async fn handle_channel_data(
404        binding_mgr: &Arc<Mutex<BindingManager>>,
405        read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
406        data: &[u8],
407    ) -> Result<()> {
408        let mut ch_data = ChannelData {
409            raw: data.to_vec(),
410            ..Default::default()
411        };
412        ch_data.decode()?;
413
414        let addr = ClientInternal::find_addr_by_channel_number(binding_mgr, ch_data.number.0)
415            .await
416            .ok_or(Error::ErrChannelBindNotFound)?;
417
418        log::trace!(
419            "channel data received from {} (ch={})",
420            addr,
421            ch_data.number.0
422        );
423
424        let _ = ClientInternal::handle_inbound_relay_conn(read_ch_tx, &ch_data.data, addr).await;
425
426        Ok(())
427    }
428
429    /// Passes inbound data in RelayConn.
430    async fn handle_inbound_relay_conn(
431        read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
432        data: &[u8],
433        from: SocketAddr,
434    ) -> Result<()> {
435        let read_ch_tx_opt = read_ch_tx.lock().await;
436        log::debug!("read_ch_tx_opt = {}", read_ch_tx_opt.is_some());
437        if let Some(tx) = &*read_ch_tx_opt {
438            log::debug!("try_send data = {:?}, from = {}", data, from);
439            if tx
440                .try_send(InboundData {
441                    data: data.to_vec(),
442                    from,
443                })
444                .is_err()
445            {
446                log::warn!("receive buffer full");
447            }
448            Ok(())
449        } else {
450            Err(Error::ErrAlreadyClosed)
451        }
452    }
453
454    /// Closes this client.
455    async fn close(&mut self) {
456        self.close_notify.cancel();
457        {
458            let mut read_ch_tx = self.read_ch_tx.lock().await;
459            read_ch_tx.take();
460        }
461        {
462            let mut tm = self.tr_map.lock().await;
463            tm.close_and_delete_all();
464        }
465    }
466
467    /// Sends a new STUN request to the given transport address.
468    async fn send_binding_request_to(&mut self, to: &str) -> Result<SocketAddr> {
469        let msg = {
470            let attrs: Vec<Box<dyn Setter>> = if !self.software.text.is_empty() {
471                vec![
472                    Box::new(TransactionId::new()),
473                    Box::new(BINDING_REQUEST),
474                    Box::new(self.software.clone()),
475                ]
476            } else {
477                vec![Box::new(TransactionId::new()), Box::new(BINDING_REQUEST)]
478            };
479
480            let mut msg = Message::new();
481            msg.build(&attrs)?;
482            msg
483        };
484
485        log::debug!("client.SendBindingRequestTo call PerformTransaction 1");
486        let tr_res = self.perform_transaction(&msg, to, false).await?;
487
488        let mut refl_addr = XorMappedAddress::default();
489        refl_addr.get_from(&tr_res.msg)?;
490
491        Ok(SocketAddr::new(refl_addr.ip, refl_addr.port))
492    }
493
494    /// Sends a new STUN request to the STUN server.
495    async fn send_binding_request(&mut self) -> Result<SocketAddr> {
496        if self.stun_serv_addr.is_empty() {
497            Err(Error::ErrStunserverAddressNotSet)
498        } else {
499            self.send_binding_request_to(&self.stun_serv_addr.clone())
500                .await
501        }
502    }
503
504    /// Returns a peer address associated with the
505    // channel number on this UDPConn
506    async fn find_addr_by_channel_number(
507        binding_mgr: &Arc<Mutex<BindingManager>>,
508        ch_num: u16,
509    ) -> Option<SocketAddr> {
510        let bm = binding_mgr.lock().await;
511        bm.find_by_number(ch_num).map(|b| b.addr)
512    }
513
514    /// Sends a TURN allocation request to the given transport address.
515    async fn allocate(&mut self) -> Result<RelayConnConfig> {
516        {
517            let read_ch_tx = self.read_ch_tx.lock().await;
518            log::debug!("allocate check: read_ch_tx_opt = {}", read_ch_tx.is_some());
519            if read_ch_tx.is_some() {
520                return Err(Error::ErrOneAllocateOnly);
521            }
522        }
523
524        let mut msg = Message::new();
525        msg.build(&[
526            Box::new(TransactionId::new()),
527            Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
528            Box::new(RequestedTransport {
529                protocol: PROTO_UDP,
530            }),
531            Box::new(FINGERPRINT),
532        ])?;
533
534        log::debug!("client.Allocate call PerformTransaction 1");
535        let tr_res = self
536            .perform_transaction(&msg, &self.turn_serv_addr.clone(), false)
537            .await?;
538        let res = tr_res.msg;
539
540        // Anonymous allocate failed, trying to authenticate.
541        let nonce = Nonce::get_from_as(&res, ATTR_NONCE)?;
542        self.realm = Realm::get_from_as(&res, ATTR_REALM)?;
543
544        self.integrity = MessageIntegrity::new_long_term_integrity(
545            self.username.text.clone(),
546            self.realm.text.clone(),
547            self.password.clone(),
548        );
549
550        // Trying to authorize.
551        msg.build(&[
552            Box::new(TransactionId::new()),
553            Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
554            Box::new(RequestedTransport {
555                protocol: PROTO_UDP,
556            }),
557            Box::new(self.username.clone()),
558            Box::new(self.realm.clone()),
559            Box::new(nonce.clone()),
560            Box::new(self.integrity.clone()),
561            Box::new(FINGERPRINT),
562        ])?;
563
564        log::debug!("client.Allocate call PerformTransaction 2");
565        let tr_res = self
566            .perform_transaction(&msg, &self.turn_serv_addr.clone(), false)
567            .await?;
568        let res = tr_res.msg;
569
570        if res.typ.class == CLASS_ERROR_RESPONSE {
571            let mut code = ErrorCodeAttribute::default();
572            let result = code.get_from(&res);
573            if result.is_err() {
574                return Err(Error::Other(format!("{}", res.typ)));
575            } else {
576                return Err(Error::Other(format!("{} (error {})", res.typ, code)));
577            }
578        }
579
580        // Getting relayed addresses from response.
581        let mut relayed = RelayedAddress::default();
582        relayed.get_from(&res)?;
583        let relayed_addr = SocketAddr::new(relayed.ip, relayed.port);
584
585        // Getting lifetime from response
586        let mut lifetime = Lifetime::default();
587        lifetime.get_from(&res)?;
588
589        let (read_ch_tx, read_ch_rx) = mpsc::channel(MAX_READ_QUEUE_SIZE);
590        {
591            let mut read_ch_tx_opt = self.read_ch_tx.lock().await;
592            *read_ch_tx_opt = Some(read_ch_tx);
593            log::debug!("allocate: read_ch_tx_opt = {}", read_ch_tx_opt.is_some());
594        }
595
596        Ok(RelayConnConfig {
597            relayed_addr,
598            integrity: self.integrity.clone(),
599            nonce,
600            lifetime: lifetime.0,
601            binding_mgr: Arc::clone(&self.binding_mgr),
602            read_ch_rx: Arc::new(Mutex::new(read_ch_rx)),
603        })
604    }
605}
606
607/// Client is a STUN server client.
608#[derive(Clone)]
609pub struct Client {
610    client_internal: Arc<Mutex<ClientInternal>>,
611}
612
613impl Client {
614    pub async fn new(config: ClientConfig) -> Result<Self> {
615        let ci = ClientInternal::new(config).await?;
616        Ok(Client {
617            client_internal: Arc::new(Mutex::new(ci)),
618        })
619    }
620
621    pub async fn listen(&self) -> Result<()> {
622        let ci = self.client_internal.lock().await;
623        ci.listen().await
624    }
625
626    pub async fn allocate(&self) -> Result<impl Conn> {
627        let config = {
628            let mut ci = self.client_internal.lock().await;
629            ci.allocate().await?
630        };
631
632        Ok(RelayConn::new(Arc::clone(&self.client_internal), config).await)
633    }
634
635    pub async fn close(&self) -> Result<()> {
636        let mut ci = self.client_internal.lock().await;
637        ci.close().await;
638        Ok(())
639    }
640
641    /// Sends a new STUN request to the given transport address.
642    pub async fn send_binding_request_to(&self, to: &str) -> Result<SocketAddr> {
643        let mut ci = self.client_internal.lock().await;
644        ci.send_binding_request_to(to).await
645    }
646
647    /// Sends a new STUN request to the STUN server.
648    pub async fn send_binding_request(&self) -> Result<SocketAddr> {
649        let mut ci = self.client_internal.lock().await;
650        ci.send_binding_request().await
651    }
652}