1#[cfg(test)]
2mod client_test;
3
4pub mod binding;
5pub mod permission;
6pub mod relay;
7pub mod transaction;
8
9use bytes::BytesMut;
10use log::{debug, trace};
11use std::collections::{HashMap, VecDeque};
12use std::net::SocketAddr;
13use std::time::Instant;
14
15use stun::attributes::*;
16use stun::integrity::*;
17use stun::message::*;
18use stun::textattrs::*;
19use stun::xoraddr::*;
20
21use binding::*;
22use transaction::*;
23
24use crate::client::relay::{Relay, RelayState};
25use crate::proto::chandata::*;
26use crate::proto::channum::ChannelNumber;
27use crate::proto::data::*;
28use crate::proto::lifetime::Lifetime;
29use crate::proto::peeraddr::*;
30use crate::proto::relayaddr::RelayedAddress;
31use crate::proto::reqtrans::RequestedTransport;
32use crate::proto::{PROTO_TCP, PROTO_UDP};
33use shared::error::{Error, Result};
34use shared::util::lookup_host;
35use shared::{TransportContext, TransportMessage, TransportProtocol};
36use stun::error_code::ErrorCodeAttribute;
37use stun::fingerprint::FINGERPRINT;
38
39const DEFAULT_RTO_IN_MS: u64 = 200;
40const MAX_DATA_BUFFER_SIZE: usize = u16::MAX as usize; const MAX_READ_QUEUE_SIZE: usize = 1024;
42
43pub type RelayedAddr = SocketAddr;
44pub type ReflexiveAddr = SocketAddr;
45pub type PeerAddr = SocketAddr;
46
47#[derive(Debug)]
48pub enum Event {
49 TransactionTimeout(TransactionId),
50
51 BindingResponse(TransactionId, ReflexiveAddr),
52 BindingError(TransactionId, Error),
53
54 AllocateResponse(TransactionId, RelayedAddr),
55 AllocateError(TransactionId, Error),
56
57 CreatePermissionResponse(TransactionId, PeerAddr),
58 CreatePermissionError(TransactionId, Error),
59
60 DataIndicationOrChannelData(Option<ChannelNumber>, PeerAddr, BytesMut),
61}
62
63enum AllocateState {
64 Attempting,
65 Requesting(TextAttribute),
66}
67
68pub struct ClientConfig {
80 pub stun_serv_addr: String, pub turn_serv_addr: String, pub local_addr: SocketAddr,
83 pub transport_protocol: TransportProtocol,
84 pub username: String,
85 pub password: String,
86 pub realm: String,
87 pub software: String,
88 pub rto_in_ms: u64,
89}
90
91pub struct Client {
93 stun_serv_addr: Option<SocketAddr>,
94 turn_serv_addr: Option<SocketAddr>,
95 local_addr: SocketAddr,
96 transport_protocol: TransportProtocol,
97 username: Username,
98 password: String,
99 realm: Realm,
100 integrity: MessageIntegrity,
101 software: Software,
102 tr_map: TransactionMap,
103 binding_mgr: BindingManager,
104 rto_in_ms: u64,
105
106 relays: HashMap<RelayedAddr, RelayState>,
107 transmits: VecDeque<TransportMessage<BytesMut>>,
108 events: VecDeque<Event>,
109}
110
111impl Client {
112 pub fn new(config: ClientConfig) -> Result<Self> {
114 let stun_serv_addr = if config.stun_serv_addr.is_empty() {
115 None
116 } else {
117 Some(lookup_host(
118 config.local_addr.is_ipv4(),
119 config.stun_serv_addr.as_str(),
120 )?)
121 };
122
123 let turn_serv_addr = if config.turn_serv_addr.is_empty() {
124 None
125 } else {
126 Some(lookup_host(
127 config.local_addr.is_ipv4(),
128 config.turn_serv_addr.as_str(),
129 )?)
130 };
131
132 Ok(Client {
133 stun_serv_addr,
134 turn_serv_addr,
135 local_addr: config.local_addr,
136 transport_protocol: config.transport_protocol,
137 username: Username::new(ATTR_USERNAME, config.username),
138 password: config.password,
139 realm: Realm::new(ATTR_REALM, config.realm),
140 software: Software::new(ATTR_SOFTWARE, config.software),
141 tr_map: TransactionMap::new(),
142 binding_mgr: BindingManager::new(),
143 rto_in_ms: if config.rto_in_ms != 0 {
144 config.rto_in_ms
145 } else {
146 DEFAULT_RTO_IN_MS
147 },
148 integrity: MessageIntegrity::new_short_term_integrity(String::new()),
149
150 relays: HashMap::new(),
151 transmits: VecDeque::new(),
152 events: VecDeque::new(),
153 })
154 }
155
156 pub fn poll_timout(&mut self) -> Option<Instant> {
157 let mut eto = None;
158 if let Some(to) = self.tr_map.poll_timout()
159 && (eto.is_none() || to < *eto.as_ref().unwrap())
160 {
161 eto = Some(to);
162 }
163
164 #[allow(clippy::map_clone)]
165 let relayed_addrs: Vec<SocketAddr> = self.relays.keys().map(|key| *key).collect();
166 for relayed_addr in relayed_addrs {
167 let relay = Relay {
168 relayed_addr,
169 client: self,
170 };
171 if let Some(to) = relay.poll_timeout()
172 && (eto.is_none() || to < *eto.as_ref().unwrap())
173 {
174 eto = Some(to);
175 }
176 }
177
178 eto
179 }
180
181 pub fn handle_timeout(&mut self, now: Instant) {
182 self.tr_map.handle_timeout(now);
183
184 #[allow(clippy::map_clone)]
185 let relayed_addrs: Vec<SocketAddr> = self.relays.keys().map(|key| *key).collect();
186 for relayed_addr in relayed_addrs {
187 let mut relay = Relay {
188 relayed_addr,
189 client: self,
190 };
191 relay.handle_timeout(now);
192 }
193 }
194
195 pub fn poll_transmit(&mut self) -> Option<TransportMessage<BytesMut>> {
196 while let Some(transmit) = self.tr_map.poll_transmit() {
197 self.transmits.push_back(transmit);
198 }
199 self.transmits.pop_front()
200 }
201
202 pub fn handle_transmit(&mut self, msg: TransportMessage<BytesMut>) -> Result<()> {
203 self.handle_inbound(&msg.message[..], msg.transport.peer_addr)
204 }
205
206 pub fn poll_event(&mut self) -> Option<Event> {
207 while let Some(event) = self.tr_map.poll_event() {
208 self.events.push_back(event);
209 }
210 self.events.pop_front()
211 }
212
213 fn handle_inbound(&mut self, data: &[u8], from: SocketAddr) -> Result<()> {
221 if is_stun_message(data) {
240 self.handle_stun_message(data)
241 } else if ChannelData::is_channel_data(data) {
242 self.handle_channel_data(data)
243 } else if self.stun_serv_addr.is_some() && &from == self.stun_serv_addr.as_ref().unwrap() {
244 Err(Error::ErrNonStunmessage)
246 } else {
247 trace!("non-STUN/TURN packet, unhandled");
249 Ok(())
250 }
251 }
252
253 fn handle_stun_message(&mut self, data: &[u8]) -> Result<()> {
254 let mut msg = Message::new();
255 msg.raw = data.to_vec();
256 msg.decode()?;
257
258 if msg.typ.class == CLASS_REQUEST {
259 return Err(Error::Other(format!(
260 "{:?} : {}",
261 Error::ErrUnexpectedStunrequestMessage,
262 msg
263 )));
264 }
265
266 if msg.typ.class == CLASS_INDICATION {
267 if msg.typ.method == METHOD_DATA {
268 let mut peer_addr = PeerAddress::default();
269 peer_addr.get_from(&msg)?;
270 let from = SocketAddr::new(peer_addr.ip, peer_addr.port);
271
272 let mut data = Data::default();
273 data.get_from(&msg)?;
274
275 debug!("data indication received from {}", from);
276
277 self.events.push_back(Event::DataIndicationOrChannelData(
278 None,
279 from,
280 BytesMut::from(&data.0[..]),
281 ))
282 }
283
284 return Ok(());
285 }
286
287 if self.tr_map.find(&msg.transaction_id).is_none() {
293 debug!("no transaction for {}", msg);
295 return Ok(());
296 }
297
298 if let Some(tr) = self.tr_map.delete(&msg.transaction_id) {
299 match msg.typ.method {
300 METHOD_BINDING => {
301 if msg.typ.class == CLASS_ERROR_RESPONSE {
302 let mut code = ErrorCodeAttribute::default();
303 let err = if code.get_from(&msg).is_err() {
304 Error::Other(format!("{}", msg.typ))
305 } else {
306 Error::Other(format!("{} (error {})", msg.typ, code))
307 };
308 self.events
309 .push_back(Event::BindingError(tr.transaction_id, err));
310 } else {
311 let mut refl_addr = XorMappedAddress::default();
312 match refl_addr.get_from(&msg) {
313 Ok(_) => {
314 self.events.push_back(Event::BindingResponse(
315 tr.transaction_id,
316 ReflexiveAddr::new(refl_addr.ip, refl_addr.port),
317 ));
318 }
319 Err(err) => {
320 self.events
321 .push_back(Event::BindingError(tr.transaction_id, err));
322 }
323 }
324 }
325 }
326 METHOD_ALLOCATE => {
327 self.handle_allocate_response(msg, tr.transaction_type)?;
328 }
329 METHOD_CREATE_PERMISSION => {
330 if let TransactionType::CreatePermissionRequest(relayed_addr, peer_addr) =
331 tr.transaction_type
332 {
333 let mut relay = Relay {
334 relayed_addr,
335 client: self,
336 };
337 relay.handle_create_permission_response(msg, peer_addr)?;
338 }
339 }
340 METHOD_REFRESH => {
341 if let TransactionType::RefreshRequest(relayed_addr) = tr.transaction_type {
342 let mut relay = Relay {
343 relayed_addr,
344 client: self,
345 };
346 relay.handle_refresh_allocation_response(msg)?;
347 }
348 }
349 METHOD_CHANNEL_BIND => {
350 if let TransactionType::ChannelBindRequest(relayed_addr, bind_addr) =
351 tr.transaction_type
352 {
353 let mut relay = Relay {
354 relayed_addr,
355 client: self,
356 };
357 relay.handle_channel_bind_response(msg, bind_addr)?;
358 }
359 }
360 _ => {}
361 }
362 }
363
364 Ok(())
365 }
366
367 fn handle_channel_data(&mut self, data: &[u8]) -> Result<()> {
368 let mut ch_data = ChannelData {
369 raw: data.to_vec(),
370 ..Default::default()
371 };
372 ch_data.decode()?;
373
374 let addr = self
375 .find_addr_by_channel_number(ch_data.number.0)
376 .ok_or(Error::ErrChannelBindNotFound)?;
377
378 trace!(
379 "channel data received from {} (ch={})",
380 addr, ch_data.number.0
381 );
382
383 self.events.push_back(Event::DataIndicationOrChannelData(
384 Some(ch_data.number),
385 addr,
386 BytesMut::from(&ch_data.data[..]),
387 ));
388
389 Ok(())
390 }
391
392 pub fn close(&mut self) {
394 self.tr_map.delete_all();
395 }
396
397 pub fn relay(&mut self, relayed_addr: SocketAddr) -> Result<Relay<'_>> {
398 if !self.relays.contains_key(&relayed_addr) {
399 Err(Error::ErrStreamNotExisted)
400 } else {
401 Ok(Relay {
402 relayed_addr,
403 client: self,
404 })
405 }
406 }
407
408 pub fn send_binding_request_to(&mut self, to: SocketAddr) -> Result<TransactionId> {
411 let msg = {
412 let attrs: Vec<Box<dyn Setter>> = if !self.software.text.is_empty() {
413 vec![
414 Box::new(TransactionId::new()),
415 Box::new(BINDING_REQUEST),
416 Box::new(self.software.clone()),
417 ]
418 } else {
419 vec![Box::new(TransactionId::new()), Box::new(BINDING_REQUEST)]
420 };
421
422 let mut msg = Message::new();
423 msg.build(&attrs)?;
424 msg
425 };
426
427 debug!("client.SendBindingRequestTo call PerformTransaction 1");
428 Ok(self.perform_transaction(&msg, to, TransactionType::BindingRequest))
429 }
430
431 pub fn send_binding_request(&mut self) -> Result<TransactionId> {
434 if let Some(stun_serv_addr) = &self.stun_serv_addr {
435 self.send_binding_request_to(*stun_serv_addr)
436 } else {
437 Err(Error::ErrStunserverAddressNotSet)
438 }
439 }
440
441 fn find_addr_by_channel_number(&self, ch_num: u16) -> Option<SocketAddr> {
444 self.binding_mgr.find_by_number(ch_num).map(|b| b.addr)
445 }
446
447 fn stun_server_addr(&self) -> Option<SocketAddr> {
449 self.stun_serv_addr
450 }
451
452 pub fn allocate(&mut self) -> Result<TransactionId> {
495 let mut msg = Message::new();
496 msg.build(&[
497 Box::new(TransactionId::new()),
498 Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
499 Box::new(RequestedTransport {
500 protocol: if self.transport_protocol == TransportProtocol::UDP {
501 PROTO_UDP
502 } else {
503 PROTO_TCP
504 },
505 }),
506 Box::new(FINGERPRINT),
507 ])?;
508
509 debug!("client.Allocate call PerformTransaction 1");
510 let mut tid = self.perform_transaction(
511 &msg,
512 self.turn_server_addr()?,
513 TransactionType::AllocateAttempt,
514 );
515 tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1);
516 Ok(tid)
517 }
518
519 fn handle_allocate_response(
520 &mut self,
521 response: Message,
522 allocate_state: TransactionType,
523 ) -> Result<()> {
524 match allocate_state {
525 TransactionType::AllocateAttempt => {
526 let nonce = match Nonce::get_from_as(&response, ATTR_NONCE) {
528 Ok(nonce) => nonce,
529 Err(err) => {
530 self.events
531 .push_back(Event::AllocateError(response.transaction_id, err));
532 return Ok(());
533 }
534 };
535 self.realm = match Realm::get_from_as(&response, ATTR_REALM) {
536 Ok(realm) => realm,
537 Err(err) => {
538 self.events
539 .push_back(Event::AllocateError(response.transaction_id, err));
540 return Ok(());
541 }
542 };
543
544 self.integrity = MessageIntegrity::new_long_term_integrity(
545 self.username.text.clone(),
546 self.realm.text.clone(),
547 self.password.clone(),
548 );
549
550 let mut msg = Message::new();
551
552 let mut tid = response.transaction_id;
555 tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1);
556
557 msg.build(&[
559 Box::new(tid),
560 Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
561 Box::new(RequestedTransport {
562 protocol: if self.transport_protocol == TransportProtocol::UDP {
563 PROTO_UDP
564 } else {
565 PROTO_TCP
566 },
567 }),
568 Box::new(self.username.clone()),
569 Box::new(self.realm.clone()),
570 Box::new(nonce.clone()),
571 Box::new(self.integrity.clone()),
572 Box::new(FINGERPRINT),
573 ])?;
574
575 debug!("client.Allocate call PerformTransaction 2");
576 self.perform_transaction(
577 &msg,
578 self.turn_server_addr()?,
579 TransactionType::AllocateRequest(nonce),
580 );
581 }
582 TransactionType::AllocateRequest(nonce) => {
583 if response.typ.class == CLASS_ERROR_RESPONSE {
584 let mut code = ErrorCodeAttribute::default();
585 let err = if code.get_from(&response).is_err() {
586 Error::Other(format!("{}", response.typ))
587 } else {
588 Error::Other(format!("{} (error {})", response.typ, code))
589 };
590 self.events
591 .push_back(Event::AllocateError(response.transaction_id, err));
592 return Ok(());
593 }
594
595 let mut relayed = RelayedAddress::default();
597 relayed.get_from(&response)?;
598 let relayed_addr = RelayedAddr::new(relayed.ip, relayed.port);
599
600 let mut lifetime = Lifetime::default();
602 lifetime.get_from(&response)?;
603
604 self.relays.insert(
605 relayed_addr,
606 RelayState::new(relayed_addr, self.integrity.clone(), nonce, lifetime.0),
607 );
608 self.events.push_back(Event::AllocateResponse(
609 response.transaction_id,
610 relayed_addr,
611 ));
612 }
613 _ => {}
614 }
615 Ok(())
616 }
617
618 fn turn_server_addr(&self) -> Result<SocketAddr> {
620 self.turn_serv_addr.ok_or(Error::ErrNilTurnSocket)
621 }
622
623 fn username(&self) -> Username {
625 self.username.clone()
626 }
627
628 fn realm(&self) -> Realm {
630 self.realm.clone()
631 }
632
633 fn write_to(&mut self, data: &[u8], remote: SocketAddr) {
635 self.transmits.push_back(TransportMessage {
636 now: Instant::now(),
637 transport: TransportContext {
638 local_addr: self.local_addr,
639 peer_addr: remote,
640 transport_protocol: self.transport_protocol,
641 ecn: None,
642 },
643 message: BytesMut::from(data),
644 });
645 }
646
647 fn perform_transaction(
649 &mut self,
650 msg: &Message,
651 to: SocketAddr,
652 transaction_type: TransactionType,
653 ) -> TransactionId {
654 let tr = Transaction::new(TransactionConfig {
655 transaction_id: msg.transaction_id,
656 transaction_type,
657 raw: BytesMut::from(&msg.raw[..]),
658 local_addr: self.local_addr,
659 peer_addr: to,
660 transport_protocol: self.transport_protocol,
661 interval: self.rto_in_ms,
662 });
663
664 trace!(
665 "start {} transaction {:?} to {}",
666 msg.typ, msg.transaction_id, tr.peer_addr
667 );
668 self.tr_map.insert(msg.transaction_id, tr);
669
670 self.write_to(&msg.raw, to);
671
672 msg.transaction_id
673 }
674}