rtc_turn/client/
mod.rs

1#[cfg(test)]
2mod client_test;
3
4pub mod binding;
5pub mod permission;
6mod proto;
7pub mod relay;
8pub mod transaction;
9
10use bytes::BytesMut;
11use log::{debug, trace};
12use std::collections::{HashMap, VecDeque};
13use std::net::SocketAddr;
14use std::time::Instant;
15
16use stun::attributes::*;
17use stun::integrity::*;
18use stun::message::*;
19use stun::textattrs::*;
20use stun::xoraddr::*;
21
22use binding::*;
23use transaction::*;
24
25use crate::client::relay::{Relay, RelayState};
26use crate::proto::chandata::*;
27use crate::proto::channum::ChannelNumber;
28use crate::proto::data::*;
29use crate::proto::lifetime::Lifetime;
30use crate::proto::peeraddr::*;
31use crate::proto::relayaddr::RelayedAddress;
32use crate::proto::reqtrans::RequestedTransport;
33use crate::proto::{PROTO_TCP, PROTO_UDP};
34use shared::error::{Error, Result};
35use shared::util::lookup_host;
36use shared::{TransportContext, TransportMessage, TransportProtocol};
37use stun::error_code::ErrorCodeAttribute;
38use stun::fingerprint::FINGERPRINT;
39
40const DEFAULT_RTO_IN_MS: u64 = 200;
41const MAX_DATA_BUFFER_SIZE: usize = u16::MAX as usize; // message size limit for Chromium
42const MAX_READ_QUEUE_SIZE: usize = 1024;
43
44pub type RelayedAddr = SocketAddr;
45pub type ReflexiveAddr = SocketAddr;
46pub type PeerAddr = SocketAddr;
47
48#[derive(Debug)]
49pub enum Event {
50    TransactionTimeout(TransactionId),
51
52    BindingResponse(TransactionId, ReflexiveAddr),
53    BindingError(TransactionId, Error),
54
55    AllocateResponse(TransactionId, RelayedAddr),
56    AllocateError(TransactionId, Error),
57
58    CreatePermissionResponse(TransactionId, PeerAddr),
59    CreatePermissionError(TransactionId, Error),
60
61    DataIndicationOrChannelData(Option<ChannelNumber>, PeerAddr, BytesMut),
62}
63
64enum AllocateState {
65    Attempting,
66    Requesting(TextAttribute),
67}
68
69//              interval [msec]
70// 0: 0 ms      +500
71// 1: 500 ms	+1000
72// 2: 1500 ms   +2000
73// 3: 3500 ms   +4000
74// 4: 7500 ms   +8000
75// 5: 15500 ms  +16000
76// 6: 31500 ms  +32000
77// -: 63500 ms  failed
78
79/// ClientConfig is a bag of config parameters for Client.
80pub struct ClientConfig {
81    pub stun_serv_addr: String, // STUN server address (e.g. "stun.abc.com:3478")
82    pub turn_serv_addr: String, // TURN server address (e.g. "turn.abc.com:3478")
83    pub local_addr: SocketAddr,
84    pub transport_protocol: TransportProtocol,
85    pub username: String,
86    pub password: String,
87    pub realm: String,
88    pub software: String,
89    pub rto_in_ms: u64,
90}
91
92/// Client is a STUN client
93pub struct Client {
94    stun_serv_addr: Option<SocketAddr>,
95    turn_serv_addr: Option<SocketAddr>,
96    local_addr: SocketAddr,
97    transport_protocol: TransportProtocol,
98    username: Username,
99    password: String,
100    realm: Realm,
101    integrity: MessageIntegrity,
102    software: Software,
103    tr_map: TransactionMap,
104    binding_mgr: BindingManager,
105    rto_in_ms: u64,
106
107    relays: HashMap<RelayedAddr, RelayState>,
108    transmits: VecDeque<TransportMessage<BytesMut>>,
109    events: VecDeque<Event>,
110}
111
112impl Client {
113    /// new returns a new Client instance. listeningAddress is the address and port to listen on, default "0.0.0.0:0"
114    pub fn new(config: ClientConfig) -> Result<Self> {
115        let stun_serv_addr = if config.stun_serv_addr.is_empty() {
116            None
117        } else {
118            Some(lookup_host(
119                config.local_addr.is_ipv4(),
120                config.stun_serv_addr.as_str(),
121            )?)
122        };
123
124        let turn_serv_addr = if config.turn_serv_addr.is_empty() {
125            None
126        } else {
127            Some(lookup_host(
128                config.local_addr.is_ipv4(),
129                config.turn_serv_addr.as_str(),
130            )?)
131        };
132
133        Ok(Client {
134            stun_serv_addr,
135            turn_serv_addr,
136            local_addr: config.local_addr,
137            transport_protocol: config.transport_protocol,
138            username: Username::new(ATTR_USERNAME, config.username),
139            password: config.password,
140            realm: Realm::new(ATTR_REALM, config.realm),
141            software: Software::new(ATTR_SOFTWARE, config.software),
142            tr_map: TransactionMap::new(),
143            binding_mgr: BindingManager::new(),
144            rto_in_ms: if config.rto_in_ms != 0 {
145                config.rto_in_ms
146            } else {
147                DEFAULT_RTO_IN_MS
148            },
149            integrity: MessageIntegrity::new_short_term_integrity(String::new()),
150
151            relays: HashMap::new(),
152            transmits: VecDeque::new(),
153            events: VecDeque::new(),
154        })
155    }
156
157    // handle_inbound handles data received.
158    // This method handles incoming packet demultiplex it by the source address
159    // and the types of the message.
160    // This return Ok(handled or not) and if there was an error.
161    // Caller should check if the packet was handled by this client or not.
162    // If not handled, it is assumed that the packet is application data.
163    // If an error is returned, the caller should discard the packet regardless.
164    fn handle_inbound(&mut self, data: &[u8], from: SocketAddr) -> Result<()> {
165        // +-------------------+-------------------------------+
166        // |   Return Values   |                               |
167        // +-------------------+       Meaning / Action        |
168        // | handled |  error  |                               |
169        // |=========+=========+===============================+
170        // |  false  |   nil   | Handle the packet as app data |
171        // |---------+---------+-------------------------------+
172        // |  true   |   nil   |        Nothing to do          |
173        // |---------+---------+-------------------------------+
174        // |  false  |  error  |     (shouldn't happen)        |
175        // |---------+---------+-------------------------------+
176        // |  true   |  error  | Error occurred while handling |
177        // +---------+---------+-------------------------------+
178        // Possible causes of the error:
179        //  - Malformed packet (parse error)
180        //  - STUN message was a request
181        //  - Non-STUN message from the STUN server
182
183        if is_stun_message(data) {
184            self.handle_stun_message(data)
185        } else if ChannelData::is_channel_data(data) {
186            self.handle_channel_data(data)
187        } else if self.stun_serv_addr.is_some() && &from == self.stun_serv_addr.as_ref().unwrap() {
188            // received from STUN server, but it is not a STUN message
189            Err(Error::ErrNonStunmessage)
190        } else {
191            // assume, this is an application data
192            trace!("non-STUN/TURN packet, unhandled");
193            Ok(())
194        }
195    }
196
197    fn handle_stun_message(&mut self, data: &[u8]) -> Result<()> {
198        let mut msg = Message::new();
199        msg.raw = data.to_vec();
200        msg.decode()?;
201
202        if msg.typ.class == CLASS_REQUEST {
203            return Err(Error::Other(format!(
204                "{:?} : {}",
205                Error::ErrUnexpectedStunrequestMessage,
206                msg
207            )));
208        }
209
210        if msg.typ.class == CLASS_INDICATION {
211            if msg.typ.method == METHOD_DATA {
212                let mut peer_addr = PeerAddress::default();
213                peer_addr.get_from(&msg)?;
214                let from = SocketAddr::new(peer_addr.ip, peer_addr.port);
215
216                let mut data = Data::default();
217                data.get_from(&msg)?;
218
219                debug!("data indication received from {}", from);
220
221                self.events.push_back(Event::DataIndicationOrChannelData(
222                    None,
223                    from,
224                    BytesMut::from(&data.0[..]),
225                ))
226            }
227
228            return Ok(());
229        }
230
231        // This is a STUN response message (transactional)
232        // The type is either:
233        // - stun.ClassSuccessResponse
234        // - stun.ClassErrorResponse
235
236        if self.tr_map.find(&msg.transaction_id).is_none() {
237            // silently discard
238            debug!("no transaction for {}", msg);
239            return Ok(());
240        }
241
242        if let Some(tr) = self.tr_map.delete(&msg.transaction_id) {
243            match msg.typ.method {
244                METHOD_BINDING => {
245                    if msg.typ.class == CLASS_ERROR_RESPONSE {
246                        let mut code = ErrorCodeAttribute::default();
247                        let err = if code.get_from(&msg).is_err() {
248                            Error::Other(format!("{}", msg.typ))
249                        } else {
250                            Error::Other(format!("{} (error {})", msg.typ, code))
251                        };
252                        self.events
253                            .push_back(Event::BindingError(tr.transaction_id, err));
254                    } else {
255                        let mut refl_addr = XorMappedAddress::default();
256                        match refl_addr.get_from(&msg) {
257                            Ok(_) => {
258                                self.events.push_back(Event::BindingResponse(
259                                    tr.transaction_id,
260                                    ReflexiveAddr::new(refl_addr.ip, refl_addr.port),
261                                ));
262                            }
263                            Err(err) => {
264                                self.events
265                                    .push_back(Event::BindingError(tr.transaction_id, err));
266                            }
267                        }
268                    }
269                }
270                METHOD_ALLOCATE => {
271                    self.handle_allocate_response(msg, tr.transaction_type)?;
272                }
273                METHOD_CREATE_PERMISSION => {
274                    if let TransactionType::CreatePermissionRequest(relayed_addr, peer_addr) =
275                        tr.transaction_type
276                    {
277                        let mut relay = Relay {
278                            relayed_addr,
279                            client: self,
280                        };
281                        relay.handle_create_permission_response(msg, peer_addr)?;
282                    }
283                }
284                METHOD_REFRESH => {
285                    if let TransactionType::RefreshRequest(relayed_addr) = tr.transaction_type {
286                        let mut relay = Relay {
287                            relayed_addr,
288                            client: self,
289                        };
290                        relay.handle_refresh_allocation_response(msg)?;
291                    }
292                }
293                METHOD_CHANNEL_BIND => {
294                    if let TransactionType::ChannelBindRequest(relayed_addr, bind_addr) =
295                        tr.transaction_type
296                    {
297                        let mut relay = Relay {
298                            relayed_addr,
299                            client: self,
300                        };
301                        relay.handle_channel_bind_response(msg, bind_addr)?;
302                    }
303                }
304                _ => {}
305            }
306        }
307
308        Ok(())
309    }
310
311    fn handle_channel_data(&mut self, data: &[u8]) -> Result<()> {
312        let mut ch_data = ChannelData {
313            raw: data.to_vec(),
314            ..Default::default()
315        };
316        ch_data.decode()?;
317
318        let addr = self
319            .find_addr_by_channel_number(ch_data.number.0)
320            .ok_or(Error::ErrChannelBindNotFound)?;
321
322        trace!(
323            "channel data received from {} (ch={})",
324            addr, ch_data.number.0
325        );
326
327        self.events.push_back(Event::DataIndicationOrChannelData(
328            Some(ch_data.number),
329            addr,
330            BytesMut::from(&ch_data.data[..]),
331        ));
332
333        Ok(())
334    }
335
336    pub fn relay(&mut self, relayed_addr: SocketAddr) -> Result<Relay<'_>> {
337        if !self.relays.contains_key(&relayed_addr) {
338            Err(Error::ErrStreamNotExisted)
339        } else {
340            Ok(Relay {
341                relayed_addr,
342                client: self,
343            })
344        }
345    }
346
347    /// send_binding_request_to sends a new STUN request to the given transport address
348    /// return key to find out corresponding Event either BindingResponse or BindingRequestTimeout
349    pub fn send_binding_request_to(&mut self, to: SocketAddr) -> Result<TransactionId> {
350        let msg = {
351            let attrs: Vec<Box<dyn Setter>> = if !self.software.text.is_empty() {
352                vec![
353                    Box::new(TransactionId::new()),
354                    Box::new(BINDING_REQUEST),
355                    Box::new(self.software.clone()),
356                ]
357            } else {
358                vec![Box::new(TransactionId::new()), Box::new(BINDING_REQUEST)]
359            };
360
361            let mut msg = Message::new();
362            msg.build(&attrs)?;
363            msg
364        };
365
366        debug!("client.SendBindingRequestTo call PerformTransaction 1");
367        Ok(self.perform_transaction(&msg, to, TransactionType::BindingRequest))
368    }
369
370    /// send_binding_request sends a new STUN request to the STUN server
371    /// return key to find out corresponding Event either BindingResponse or BindingRequestTimeout
372    pub fn send_binding_request(&mut self) -> Result<TransactionId> {
373        if let Some(stun_serv_addr) = &self.stun_serv_addr {
374            self.send_binding_request_to(*stun_serv_addr)
375        } else {
376            Err(Error::ErrStunserverAddressNotSet)
377        }
378    }
379
380    // find_addr_by_channel_number returns a peer address associated with the
381    // channel number on this UDPConn
382    fn find_addr_by_channel_number(&self, ch_num: u16) -> Option<SocketAddr> {
383        self.binding_mgr.find_by_number(ch_num).map(|b| b.addr)
384    }
385
386    // stun_server_addr return the STUN server address
387    fn stun_server_addr(&self) -> Option<SocketAddr> {
388        self.stun_serv_addr
389    }
390
391    /* https://datatracker.ietf.org/doc/html/rfc8656#section-20
392    TURN                                 TURN          Peer         Peer
393    client                               server         A            B
394      |                                    |            |            |
395      |--- Allocate request -------------->|            |            |
396      |    Transaction-Id=0xA56250D3F17ABE679422DE85    |            |
397      |    SOFTWARE="Example client, version 1.03"      |            |
398      |    LIFETIME=3600 (1 hour)          |            |            |
399      |    REQUESTED-TRANSPORT=17 (UDP)    |            |            |
400      |    DONT-FRAGMENT                   |            |            |
401      |                                    |            |            |
402      |<-- Allocate error response --------|            |            |
403      |    Transaction-Id=0xA56250D3F17ABE679422DE85    |            |
404      |    SOFTWARE="Example server, version 1.17"      |            |
405      |    ERROR-CODE=401 (Unauthorized)   |            |            |
406      |    REALM="example.com"             |            |            |
407      |    NONCE="obMatJos2gAAAadl7W7PeDU4hKE72jda"     |            |
408      |    PASSWORD-ALGORITHMS=MD5 and SHA256           |            |
409      |                                    |            |            |
410      |--- Allocate request -------------->|            |            |
411      |    Transaction-Id=0xC271E932AD7446A32C234492    |            |
412      |    SOFTWARE="Example client 1.03"  |            |            |
413      |    LIFETIME=3600 (1 hour)          |            |            |
414      |    REQUESTED-TRANSPORT=17 (UDP)    |            |            |
415      |    DONT-FRAGMENT                   |            |            |
416      |    USERNAME="George"               |            |            |
417      |    REALM="example.com"             |            |            |
418      |    NONCE="obMatJos2gAAAadl7W7PeDU4hKE72jda"     |            |
419      |    PASSWORD-ALGORITHMS=MD5 and SHA256           |            |
420      |    PASSWORD-ALGORITHM=SHA256       |            |            |
421      |    MESSAGE-INTEGRITY=...           |            |            |
422      |    MESSAGE-INTEGRITY-SHA256=...    |            |            |
423      |                                    |            |            |
424      |<-- Allocate success response ------|            |            |
425      |    Transaction-Id=0xC271E932AD7446A32C234492    |            |
426      |    SOFTWARE="Example server, version 1.17"      |            |
427      |    LIFETIME=1200 (20 minutes)      |            |            |
428      |    XOR-RELAYED-ADDRESS=192.0.2.15:50000         |            |
429      |    XOR-MAPPED-ADDRESS=192.0.2.1:7000            |            |
430      |    MESSAGE-INTEGRITY-SHA256=...    |            |            |
431    */
432    /// Allocate sends a TURN allocation request to the given transport address
433    pub fn allocate(&mut self) -> Result<TransactionId> {
434        let mut msg = Message::new();
435        msg.build(&[
436            Box::new(TransactionId::new()),
437            Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
438            Box::new(RequestedTransport {
439                protocol: if self.transport_protocol == TransportProtocol::UDP {
440                    PROTO_UDP
441                } else {
442                    PROTO_TCP
443                },
444            }),
445            Box::new(FINGERPRINT),
446        ])?;
447
448        debug!("client.Allocate call PerformTransaction 1");
449        let mut tid = self.perform_transaction(
450            &msg,
451            self.turn_server_addr()?,
452            TransactionType::AllocateAttempt,
453        );
454        tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1);
455        Ok(tid)
456    }
457
458    fn handle_allocate_response(
459        &mut self,
460        response: Message,
461        allocate_state: TransactionType,
462    ) -> Result<()> {
463        match allocate_state {
464            TransactionType::AllocateAttempt => {
465                // Anonymous allocate failed, trying to authenticate.
466                let nonce = match Nonce::get_from_as(&response, ATTR_NONCE) {
467                    Ok(nonce) => nonce,
468                    Err(err) => {
469                        self.events
470                            .push_back(Event::AllocateError(response.transaction_id, err));
471                        return Ok(());
472                    }
473                };
474                self.realm = match Realm::get_from_as(&response, ATTR_REALM) {
475                    Ok(realm) => realm,
476                    Err(err) => {
477                        self.events
478                            .push_back(Event::AllocateError(response.transaction_id, err));
479                        return Ok(());
480                    }
481                };
482
483                self.integrity = MessageIntegrity::new_long_term_integrity(
484                    self.username.text.clone(),
485                    self.realm.text.clone(),
486                    self.password.clone(),
487                );
488
489                let mut msg = Message::new();
490
491                // make it same as allocate() return value so that client can retrieve it
492                // from Event::AllocateResponse
493                let mut tid = response.transaction_id;
494                tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1);
495
496                // Trying to authorize.
497                msg.build(&[
498                    Box::new(tid),
499                    Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
500                    Box::new(RequestedTransport {
501                        protocol: if self.transport_protocol == TransportProtocol::UDP {
502                            PROTO_UDP
503                        } else {
504                            PROTO_TCP
505                        },
506                    }),
507                    Box::new(self.username.clone()),
508                    Box::new(self.realm.clone()),
509                    Box::new(nonce.clone()),
510                    Box::new(self.integrity.clone()),
511                    Box::new(FINGERPRINT),
512                ])?;
513
514                debug!("client.Allocate call PerformTransaction 2");
515                self.perform_transaction(
516                    &msg,
517                    self.turn_server_addr()?,
518                    TransactionType::AllocateRequest(nonce),
519                );
520            }
521            TransactionType::AllocateRequest(nonce) => {
522                if response.typ.class == CLASS_ERROR_RESPONSE {
523                    let mut code = ErrorCodeAttribute::default();
524                    let err = if code.get_from(&response).is_err() {
525                        Error::Other(format!("{}", response.typ))
526                    } else {
527                        Error::Other(format!("{} (error {})", response.typ, code))
528                    };
529                    self.events
530                        .push_back(Event::AllocateError(response.transaction_id, err));
531                    return Ok(());
532                }
533
534                // Getting relayed addresses from response.
535                let mut relayed = RelayedAddress::default();
536                relayed.get_from(&response)?;
537                let relayed_addr = RelayedAddr::new(relayed.ip, relayed.port);
538
539                // Getting lifetime from response
540                let mut lifetime = Lifetime::default();
541                lifetime.get_from(&response)?;
542
543                self.relays.insert(
544                    relayed_addr,
545                    RelayState::new(relayed_addr, self.integrity.clone(), nonce, lifetime.0),
546                );
547                self.events.push_back(Event::AllocateResponse(
548                    response.transaction_id,
549                    relayed_addr,
550                ));
551            }
552            _ => {}
553        }
554        Ok(())
555    }
556
557    /// turn_server_addr return the TURN server address
558    fn turn_server_addr(&self) -> Result<SocketAddr> {
559        self.turn_serv_addr.ok_or(Error::ErrNilTurnSocket)
560    }
561
562    /// username returns username
563    fn username(&self) -> Username {
564        self.username.clone()
565    }
566
567    /// realm return realm
568    fn realm(&self) -> Realm {
569        self.realm.clone()
570    }
571
572    /// WriteTo sends data to the specified destination using the base socket.
573    fn write_to(&mut self, data: &[u8], remote: SocketAddr) {
574        self.transmits.push_back(TransportMessage {
575            now: Instant::now(),
576            transport: TransportContext {
577                local_addr: self.local_addr,
578                peer_addr: remote,
579                transport_protocol: self.transport_protocol,
580                ecn: None,
581            },
582            message: BytesMut::from(data),
583        });
584    }
585
586    // PerformTransaction performs STUN transaction
587    fn perform_transaction(
588        &mut self,
589        msg: &Message,
590        to: SocketAddr,
591        transaction_type: TransactionType,
592    ) -> TransactionId {
593        let tr = Transaction::new(TransactionConfig {
594            transaction_id: msg.transaction_id,
595            transaction_type,
596            raw: BytesMut::from(&msg.raw[..]),
597            local_addr: self.local_addr,
598            peer_addr: to,
599            transport_protocol: self.transport_protocol,
600            interval: self.rto_in_ms,
601        });
602
603        trace!(
604            "start {} transaction {:?} to {}",
605            msg.typ, msg.transaction_id, tr.peer_addr
606        );
607        self.tr_map.insert(msg.transaction_id, tr);
608
609        self.write_to(&msg.raw, to);
610
611        msg.transaction_id
612    }
613}