rtc_turn/client/
mod.rs

1#[cfg(test)]
2mod client_test;
3
4pub mod binding;
5pub mod permission;
6pub mod relay;
7pub mod transaction;
8
9use bytes::BytesMut;
10use log::{debug, trace};
11use std::collections::{HashMap, VecDeque};
12use std::net::SocketAddr;
13use std::time::Instant;
14
15use stun::attributes::*;
16use stun::integrity::*;
17use stun::message::*;
18use stun::textattrs::*;
19use stun::xoraddr::*;
20
21use binding::*;
22use transaction::*;
23
24use crate::client::relay::{Relay, RelayState};
25use crate::proto::chandata::*;
26use crate::proto::channum::ChannelNumber;
27use crate::proto::data::*;
28use crate::proto::lifetime::Lifetime;
29use crate::proto::peeraddr::*;
30use crate::proto::relayaddr::RelayedAddress;
31use crate::proto::reqtrans::RequestedTransport;
32use crate::proto::{PROTO_TCP, PROTO_UDP};
33use shared::error::{Error, Result};
34use shared::util::lookup_host;
35use shared::{TransportContext, TransportMessage, TransportProtocol};
36use stun::error_code::ErrorCodeAttribute;
37use stun::fingerprint::FINGERPRINT;
38
39const DEFAULT_RTO_IN_MS: u64 = 200;
40const MAX_DATA_BUFFER_SIZE: usize = u16::MAX as usize; // message size limit for Chromium
41const MAX_READ_QUEUE_SIZE: usize = 1024;
42
43pub type RelayedAddr = SocketAddr;
44pub type ReflexiveAddr = SocketAddr;
45pub type PeerAddr = SocketAddr;
46
47#[derive(Debug)]
48pub enum Event {
49    TransactionTimeout(TransactionId),
50
51    BindingResponse(TransactionId, ReflexiveAddr),
52    BindingError(TransactionId, Error),
53
54    AllocateResponse(TransactionId, RelayedAddr),
55    AllocateError(TransactionId, Error),
56
57    CreatePermissionResponse(TransactionId, PeerAddr),
58    CreatePermissionError(TransactionId, Error),
59
60    DataIndicationOrChannelData(Option<ChannelNumber>, PeerAddr, BytesMut),
61}
62
63enum AllocateState {
64    Attempting,
65    Requesting(TextAttribute),
66}
67
68//              interval [msec]
69// 0: 0 ms      +500
70// 1: 500 ms	+1000
71// 2: 1500 ms   +2000
72// 3: 3500 ms   +4000
73// 4: 7500 ms   +8000
74// 5: 15500 ms  +16000
75// 6: 31500 ms  +32000
76// -: 63500 ms  failed
77
78/// ClientConfig is a bag of config parameters for Client.
79pub struct ClientConfig {
80    pub stun_serv_addr: String, // STUN server address (e.g. "stun.abc.com:3478")
81    pub turn_serv_addr: String, // TURN server address (e.g. "turn.abc.com:3478")
82    pub local_addr: SocketAddr,
83    pub transport_protocol: TransportProtocol,
84    pub username: String,
85    pub password: String,
86    pub realm: String,
87    pub software: String,
88    pub rto_in_ms: u64,
89}
90
91/// Client is a STUN client
92pub struct Client {
93    stun_serv_addr: Option<SocketAddr>,
94    turn_serv_addr: Option<SocketAddr>,
95    local_addr: SocketAddr,
96    transport_protocol: TransportProtocol,
97    username: Username,
98    password: String,
99    realm: Realm,
100    integrity: MessageIntegrity,
101    software: Software,
102    tr_map: TransactionMap,
103    binding_mgr: BindingManager,
104    rto_in_ms: u64,
105
106    relays: HashMap<RelayedAddr, RelayState>,
107    transmits: VecDeque<TransportMessage<BytesMut>>,
108    events: VecDeque<Event>,
109}
110
111impl Client {
112    /// new returns a new Client instance. listeningAddress is the address and port to listen on, default "0.0.0.0:0"
113    pub fn new(config: ClientConfig) -> Result<Self> {
114        let stun_serv_addr = if config.stun_serv_addr.is_empty() {
115            None
116        } else {
117            Some(lookup_host(
118                config.local_addr.is_ipv4(),
119                config.stun_serv_addr.as_str(),
120            )?)
121        };
122
123        let turn_serv_addr = if config.turn_serv_addr.is_empty() {
124            None
125        } else {
126            Some(lookup_host(
127                config.local_addr.is_ipv4(),
128                config.turn_serv_addr.as_str(),
129            )?)
130        };
131
132        Ok(Client {
133            stun_serv_addr,
134            turn_serv_addr,
135            local_addr: config.local_addr,
136            transport_protocol: config.transport_protocol,
137            username: Username::new(ATTR_USERNAME, config.username),
138            password: config.password,
139            realm: Realm::new(ATTR_REALM, config.realm),
140            software: Software::new(ATTR_SOFTWARE, config.software),
141            tr_map: TransactionMap::new(),
142            binding_mgr: BindingManager::new(),
143            rto_in_ms: if config.rto_in_ms != 0 {
144                config.rto_in_ms
145            } else {
146                DEFAULT_RTO_IN_MS
147            },
148            integrity: MessageIntegrity::new_short_term_integrity(String::new()),
149
150            relays: HashMap::new(),
151            transmits: VecDeque::new(),
152            events: VecDeque::new(),
153        })
154    }
155
156    pub fn poll_timout(&mut self) -> Option<Instant> {
157        let mut eto = None;
158        if let Some(to) = self.tr_map.poll_timout()
159            && (eto.is_none() || to < *eto.as_ref().unwrap())
160        {
161            eto = Some(to);
162        }
163
164        #[allow(clippy::map_clone)]
165        let relayed_addrs: Vec<SocketAddr> = self.relays.keys().map(|key| *key).collect();
166        for relayed_addr in relayed_addrs {
167            let relay = Relay {
168                relayed_addr,
169                client: self,
170            };
171            if let Some(to) = relay.poll_timeout()
172                && (eto.is_none() || to < *eto.as_ref().unwrap())
173            {
174                eto = Some(to);
175            }
176        }
177
178        eto
179    }
180
181    pub fn handle_timeout(&mut self, now: Instant) {
182        self.tr_map.handle_timeout(now);
183
184        #[allow(clippy::map_clone)]
185        let relayed_addrs: Vec<SocketAddr> = self.relays.keys().map(|key| *key).collect();
186        for relayed_addr in relayed_addrs {
187            let mut relay = Relay {
188                relayed_addr,
189                client: self,
190            };
191            relay.handle_timeout(now);
192        }
193    }
194
195    pub fn poll_transmit(&mut self) -> Option<TransportMessage<BytesMut>> {
196        while let Some(transmit) = self.tr_map.poll_transmit() {
197            self.transmits.push_back(transmit);
198        }
199        self.transmits.pop_front()
200    }
201
202    pub fn handle_transmit(&mut self, msg: TransportMessage<BytesMut>) -> Result<()> {
203        self.handle_inbound(&msg.message[..], msg.transport.peer_addr)
204    }
205
206    pub fn poll_event(&mut self) -> Option<Event> {
207        while let Some(event) = self.tr_map.poll_event() {
208            self.events.push_back(event);
209        }
210        self.events.pop_front()
211    }
212
213    // handle_inbound handles data received.
214    // This method handles incoming packet demultiplex it by the source address
215    // and the types of the message.
216    // This return Ok(handled or not) and if there was an error.
217    // Caller should check if the packet was handled by this client or not.
218    // If not handled, it is assumed that the packet is application data.
219    // If an error is returned, the caller should discard the packet regardless.
220    fn handle_inbound(&mut self, data: &[u8], from: SocketAddr) -> Result<()> {
221        // +-------------------+-------------------------------+
222        // |   Return Values   |                               |
223        // +-------------------+       Meaning / Action        |
224        // | handled |  error  |                               |
225        // |=========+=========+===============================+
226        // |  false  |   nil   | Handle the packet as app data |
227        // |---------+---------+-------------------------------+
228        // |  true   |   nil   |        Nothing to do          |
229        // |---------+---------+-------------------------------+
230        // |  false  |  error  |     (shouldn't happen)        |
231        // |---------+---------+-------------------------------+
232        // |  true   |  error  | Error occurred while handling |
233        // +---------+---------+-------------------------------+
234        // Possible causes of the error:
235        //  - Malformed packet (parse error)
236        //  - STUN message was a request
237        //  - Non-STUN message from the STUN server
238
239        if is_stun_message(data) {
240            self.handle_stun_message(data)
241        } else if ChannelData::is_channel_data(data) {
242            self.handle_channel_data(data)
243        } else if self.stun_serv_addr.is_some() && &from == self.stun_serv_addr.as_ref().unwrap() {
244            // received from STUN server, but it is not a STUN message
245            Err(Error::ErrNonStunmessage)
246        } else {
247            // assume, this is an application data
248            trace!("non-STUN/TURN packet, unhandled");
249            Ok(())
250        }
251    }
252
253    fn handle_stun_message(&mut self, data: &[u8]) -> Result<()> {
254        let mut msg = Message::new();
255        msg.raw = data.to_vec();
256        msg.decode()?;
257
258        if msg.typ.class == CLASS_REQUEST {
259            return Err(Error::Other(format!(
260                "{:?} : {}",
261                Error::ErrUnexpectedStunrequestMessage,
262                msg
263            )));
264        }
265
266        if msg.typ.class == CLASS_INDICATION {
267            if msg.typ.method == METHOD_DATA {
268                let mut peer_addr = PeerAddress::default();
269                peer_addr.get_from(&msg)?;
270                let from = SocketAddr::new(peer_addr.ip, peer_addr.port);
271
272                let mut data = Data::default();
273                data.get_from(&msg)?;
274
275                debug!("data indication received from {}", from);
276
277                self.events.push_back(Event::DataIndicationOrChannelData(
278                    None,
279                    from,
280                    BytesMut::from(&data.0[..]),
281                ))
282            }
283
284            return Ok(());
285        }
286
287        // This is a STUN response message (transactional)
288        // The type is either:
289        // - stun.ClassSuccessResponse
290        // - stun.ClassErrorResponse
291
292        if self.tr_map.find(&msg.transaction_id).is_none() {
293            // silently discard
294            debug!("no transaction for {}", msg);
295            return Ok(());
296        }
297
298        if let Some(tr) = self.tr_map.delete(&msg.transaction_id) {
299            match msg.typ.method {
300                METHOD_BINDING => {
301                    if msg.typ.class == CLASS_ERROR_RESPONSE {
302                        let mut code = ErrorCodeAttribute::default();
303                        let err = if code.get_from(&msg).is_err() {
304                            Error::Other(format!("{}", msg.typ))
305                        } else {
306                            Error::Other(format!("{} (error {})", msg.typ, code))
307                        };
308                        self.events
309                            .push_back(Event::BindingError(tr.transaction_id, err));
310                    } else {
311                        let mut refl_addr = XorMappedAddress::default();
312                        match refl_addr.get_from(&msg) {
313                            Ok(_) => {
314                                self.events.push_back(Event::BindingResponse(
315                                    tr.transaction_id,
316                                    ReflexiveAddr::new(refl_addr.ip, refl_addr.port),
317                                ));
318                            }
319                            Err(err) => {
320                                self.events
321                                    .push_back(Event::BindingError(tr.transaction_id, err));
322                            }
323                        }
324                    }
325                }
326                METHOD_ALLOCATE => {
327                    self.handle_allocate_response(msg, tr.transaction_type)?;
328                }
329                METHOD_CREATE_PERMISSION => {
330                    if let TransactionType::CreatePermissionRequest(relayed_addr, peer_addr) =
331                        tr.transaction_type
332                    {
333                        let mut relay = Relay {
334                            relayed_addr,
335                            client: self,
336                        };
337                        relay.handle_create_permission_response(msg, peer_addr)?;
338                    }
339                }
340                METHOD_REFRESH => {
341                    if let TransactionType::RefreshRequest(relayed_addr) = tr.transaction_type {
342                        let mut relay = Relay {
343                            relayed_addr,
344                            client: self,
345                        };
346                        relay.handle_refresh_allocation_response(msg)?;
347                    }
348                }
349                METHOD_CHANNEL_BIND => {
350                    if let TransactionType::ChannelBindRequest(relayed_addr, bind_addr) =
351                        tr.transaction_type
352                    {
353                        let mut relay = Relay {
354                            relayed_addr,
355                            client: self,
356                        };
357                        relay.handle_channel_bind_response(msg, bind_addr)?;
358                    }
359                }
360                _ => {}
361            }
362        }
363
364        Ok(())
365    }
366
367    fn handle_channel_data(&mut self, data: &[u8]) -> Result<()> {
368        let mut ch_data = ChannelData {
369            raw: data.to_vec(),
370            ..Default::default()
371        };
372        ch_data.decode()?;
373
374        let addr = self
375            .find_addr_by_channel_number(ch_data.number.0)
376            .ok_or(Error::ErrChannelBindNotFound)?;
377
378        trace!(
379            "channel data received from {} (ch={})",
380            addr, ch_data.number.0
381        );
382
383        self.events.push_back(Event::DataIndicationOrChannelData(
384            Some(ch_data.number),
385            addr,
386            BytesMut::from(&ch_data.data[..]),
387        ));
388
389        Ok(())
390    }
391
392    /// Close closes this client
393    pub fn close(&mut self) {
394        self.tr_map.delete_all();
395    }
396
397    pub fn relay(&mut self, relayed_addr: SocketAddr) -> Result<Relay<'_>> {
398        if !self.relays.contains_key(&relayed_addr) {
399            Err(Error::ErrStreamNotExisted)
400        } else {
401            Ok(Relay {
402                relayed_addr,
403                client: self,
404            })
405        }
406    }
407
408    /// send_binding_request_to sends a new STUN request to the given transport address
409    /// return key to find out corresponding Event either BindingResponse or BindingRequestTimeout
410    pub fn send_binding_request_to(&mut self, to: SocketAddr) -> Result<TransactionId> {
411        let msg = {
412            let attrs: Vec<Box<dyn Setter>> = if !self.software.text.is_empty() {
413                vec![
414                    Box::new(TransactionId::new()),
415                    Box::new(BINDING_REQUEST),
416                    Box::new(self.software.clone()),
417                ]
418            } else {
419                vec![Box::new(TransactionId::new()), Box::new(BINDING_REQUEST)]
420            };
421
422            let mut msg = Message::new();
423            msg.build(&attrs)?;
424            msg
425        };
426
427        debug!("client.SendBindingRequestTo call PerformTransaction 1");
428        Ok(self.perform_transaction(&msg, to, TransactionType::BindingRequest))
429    }
430
431    /// send_binding_request sends a new STUN request to the STUN server
432    /// return key to find out corresponding Event either BindingResponse or BindingRequestTimeout
433    pub fn send_binding_request(&mut self) -> Result<TransactionId> {
434        if let Some(stun_serv_addr) = &self.stun_serv_addr {
435            self.send_binding_request_to(*stun_serv_addr)
436        } else {
437            Err(Error::ErrStunserverAddressNotSet)
438        }
439    }
440
441    // find_addr_by_channel_number returns a peer address associated with the
442    // channel number on this UDPConn
443    fn find_addr_by_channel_number(&self, ch_num: u16) -> Option<SocketAddr> {
444        self.binding_mgr.find_by_number(ch_num).map(|b| b.addr)
445    }
446
447    // stun_server_addr return the STUN server address
448    fn stun_server_addr(&self) -> Option<SocketAddr> {
449        self.stun_serv_addr
450    }
451
452    /* https://datatracker.ietf.org/doc/html/rfc8656#section-20
453    TURN                                 TURN          Peer         Peer
454    client                               server         A            B
455      |                                    |            |            |
456      |--- Allocate request -------------->|            |            |
457      |    Transaction-Id=0xA56250D3F17ABE679422DE85    |            |
458      |    SOFTWARE="Example client, version 1.03"      |            |
459      |    LIFETIME=3600 (1 hour)          |            |            |
460      |    REQUESTED-TRANSPORT=17 (UDP)    |            |            |
461      |    DONT-FRAGMENT                   |            |            |
462      |                                    |            |            |
463      |<-- Allocate error response --------|            |            |
464      |    Transaction-Id=0xA56250D3F17ABE679422DE85    |            |
465      |    SOFTWARE="Example server, version 1.17"      |            |
466      |    ERROR-CODE=401 (Unauthorized)   |            |            |
467      |    REALM="example.com"             |            |            |
468      |    NONCE="obMatJos2gAAAadl7W7PeDU4hKE72jda"     |            |
469      |    PASSWORD-ALGORITHMS=MD5 and SHA256           |            |
470      |                                    |            |            |
471      |--- Allocate request -------------->|            |            |
472      |    Transaction-Id=0xC271E932AD7446A32C234492    |            |
473      |    SOFTWARE="Example client 1.03"  |            |            |
474      |    LIFETIME=3600 (1 hour)          |            |            |
475      |    REQUESTED-TRANSPORT=17 (UDP)    |            |            |
476      |    DONT-FRAGMENT                   |            |            |
477      |    USERNAME="George"               |            |            |
478      |    REALM="example.com"             |            |            |
479      |    NONCE="obMatJos2gAAAadl7W7PeDU4hKE72jda"     |            |
480      |    PASSWORD-ALGORITHMS=MD5 and SHA256           |            |
481      |    PASSWORD-ALGORITHM=SHA256       |            |            |
482      |    MESSAGE-INTEGRITY=...           |            |            |
483      |    MESSAGE-INTEGRITY-SHA256=...    |            |            |
484      |                                    |            |            |
485      |<-- Allocate success response ------|            |            |
486      |    Transaction-Id=0xC271E932AD7446A32C234492    |            |
487      |    SOFTWARE="Example server, version 1.17"      |            |
488      |    LIFETIME=1200 (20 minutes)      |            |            |
489      |    XOR-RELAYED-ADDRESS=192.0.2.15:50000         |            |
490      |    XOR-MAPPED-ADDRESS=192.0.2.1:7000            |            |
491      |    MESSAGE-INTEGRITY-SHA256=...    |            |            |
492    */
493    /// Allocate sends a TURN allocation request to the given transport address
494    pub fn allocate(&mut self) -> Result<TransactionId> {
495        let mut msg = Message::new();
496        msg.build(&[
497            Box::new(TransactionId::new()),
498            Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
499            Box::new(RequestedTransport {
500                protocol: if self.transport_protocol == TransportProtocol::UDP {
501                    PROTO_UDP
502                } else {
503                    PROTO_TCP
504                },
505            }),
506            Box::new(FINGERPRINT),
507        ])?;
508
509        debug!("client.Allocate call PerformTransaction 1");
510        let mut tid = self.perform_transaction(
511            &msg,
512            self.turn_server_addr()?,
513            TransactionType::AllocateAttempt,
514        );
515        tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1);
516        Ok(tid)
517    }
518
519    fn handle_allocate_response(
520        &mut self,
521        response: Message,
522        allocate_state: TransactionType,
523    ) -> Result<()> {
524        match allocate_state {
525            TransactionType::AllocateAttempt => {
526                // Anonymous allocate failed, trying to authenticate.
527                let nonce = match Nonce::get_from_as(&response, ATTR_NONCE) {
528                    Ok(nonce) => nonce,
529                    Err(err) => {
530                        self.events
531                            .push_back(Event::AllocateError(response.transaction_id, err));
532                        return Ok(());
533                    }
534                };
535                self.realm = match Realm::get_from_as(&response, ATTR_REALM) {
536                    Ok(realm) => realm,
537                    Err(err) => {
538                        self.events
539                            .push_back(Event::AllocateError(response.transaction_id, err));
540                        return Ok(());
541                    }
542                };
543
544                self.integrity = MessageIntegrity::new_long_term_integrity(
545                    self.username.text.clone(),
546                    self.realm.text.clone(),
547                    self.password.clone(),
548                );
549
550                let mut msg = Message::new();
551
552                // make it same as allocate() return value so that client can retrieve it
553                // from Event::AllocateResponse
554                let mut tid = response.transaction_id;
555                tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1);
556
557                // Trying to authorize.
558                msg.build(&[
559                    Box::new(tid),
560                    Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
561                    Box::new(RequestedTransport {
562                        protocol: if self.transport_protocol == TransportProtocol::UDP {
563                            PROTO_UDP
564                        } else {
565                            PROTO_TCP
566                        },
567                    }),
568                    Box::new(self.username.clone()),
569                    Box::new(self.realm.clone()),
570                    Box::new(nonce.clone()),
571                    Box::new(self.integrity.clone()),
572                    Box::new(FINGERPRINT),
573                ])?;
574
575                debug!("client.Allocate call PerformTransaction 2");
576                self.perform_transaction(
577                    &msg,
578                    self.turn_server_addr()?,
579                    TransactionType::AllocateRequest(nonce),
580                );
581            }
582            TransactionType::AllocateRequest(nonce) => {
583                if response.typ.class == CLASS_ERROR_RESPONSE {
584                    let mut code = ErrorCodeAttribute::default();
585                    let err = if code.get_from(&response).is_err() {
586                        Error::Other(format!("{}", response.typ))
587                    } else {
588                        Error::Other(format!("{} (error {})", response.typ, code))
589                    };
590                    self.events
591                        .push_back(Event::AllocateError(response.transaction_id, err));
592                    return Ok(());
593                }
594
595                // Getting relayed addresses from response.
596                let mut relayed = RelayedAddress::default();
597                relayed.get_from(&response)?;
598                let relayed_addr = RelayedAddr::new(relayed.ip, relayed.port);
599
600                // Getting lifetime from response
601                let mut lifetime = Lifetime::default();
602                lifetime.get_from(&response)?;
603
604                self.relays.insert(
605                    relayed_addr,
606                    RelayState::new(relayed_addr, self.integrity.clone(), nonce, lifetime.0),
607                );
608                self.events.push_back(Event::AllocateResponse(
609                    response.transaction_id,
610                    relayed_addr,
611                ));
612            }
613            _ => {}
614        }
615        Ok(())
616    }
617
618    /// turn_server_addr return the TURN server address
619    fn turn_server_addr(&self) -> Result<SocketAddr> {
620        self.turn_serv_addr.ok_or(Error::ErrNilTurnSocket)
621    }
622
623    /// username returns username
624    fn username(&self) -> Username {
625        self.username.clone()
626    }
627
628    /// realm return realm
629    fn realm(&self) -> Realm {
630        self.realm.clone()
631    }
632
633    /// WriteTo sends data to the specified destination using the base socket.
634    fn write_to(&mut self, data: &[u8], remote: SocketAddr) {
635        self.transmits.push_back(TransportMessage {
636            now: Instant::now(),
637            transport: TransportContext {
638                local_addr: self.local_addr,
639                peer_addr: remote,
640                transport_protocol: self.transport_protocol,
641                ecn: None,
642            },
643            message: BytesMut::from(data),
644        });
645    }
646
647    // PerformTransaction performs STUN transaction
648    fn perform_transaction(
649        &mut self,
650        msg: &Message,
651        to: SocketAddr,
652        transaction_type: TransactionType,
653    ) -> TransactionId {
654        let tr = Transaction::new(TransactionConfig {
655            transaction_id: msg.transaction_id,
656            transaction_type,
657            raw: BytesMut::from(&msg.raw[..]),
658            local_addr: self.local_addr,
659            peer_addr: to,
660            transport_protocol: self.transport_protocol,
661            interval: self.rto_in_ms,
662        });
663
664        trace!(
665            "start {} transaction {:?} to {}",
666            msg.typ, msg.transaction_id, tr.peer_addr
667        );
668        self.tr_map.insert(msg.transaction_id, tr);
669
670        self.write_to(&msg.raw, to);
671
672        msg.transaction_id
673    }
674}