1#[cfg(test)]
2mod client_test;
3
4pub mod binding;
5pub mod permission;
6mod proto;
7pub mod relay;
8pub mod transaction;
9
10use bytes::BytesMut;
11use log::{debug, trace};
12use std::collections::{HashMap, VecDeque};
13use std::net::SocketAddr;
14use std::time::Instant;
15
16use stun::attributes::*;
17use stun::integrity::*;
18use stun::message::*;
19use stun::textattrs::*;
20use stun::xoraddr::*;
21
22use binding::*;
23use transaction::*;
24
25use crate::client::relay::{Relay, RelayState};
26use crate::proto::chandata::*;
27use crate::proto::channum::ChannelNumber;
28use crate::proto::data::*;
29use crate::proto::lifetime::Lifetime;
30use crate::proto::peeraddr::*;
31use crate::proto::relayaddr::RelayedAddress;
32use crate::proto::reqtrans::RequestedTransport;
33use crate::proto::{PROTO_TCP, PROTO_UDP};
34use shared::error::{Error, Result};
35use shared::util::lookup_host;
36use shared::{TransportContext, TransportMessage, TransportProtocol};
37use stun::error_code::ErrorCodeAttribute;
38use stun::fingerprint::FINGERPRINT;
39
40const DEFAULT_RTO_IN_MS: u64 = 200;
41const MAX_DATA_BUFFER_SIZE: usize = u16::MAX as usize; const MAX_READ_QUEUE_SIZE: usize = 1024;
43
44pub type RelayedAddr = SocketAddr;
45pub type ReflexiveAddr = SocketAddr;
46pub type PeerAddr = SocketAddr;
47
48#[derive(Debug)]
49pub enum Event {
50 TransactionTimeout(TransactionId),
51
52 BindingResponse(TransactionId, ReflexiveAddr),
53 BindingError(TransactionId, Error),
54
55 AllocateResponse(TransactionId, RelayedAddr),
56 AllocateError(TransactionId, Error),
57
58 CreatePermissionResponse(TransactionId, PeerAddr),
59 CreatePermissionError(TransactionId, Error),
60
61 DataIndicationOrChannelData(Option<ChannelNumber>, PeerAddr, BytesMut),
62}
63
64enum AllocateState {
65 Attempting,
66 Requesting(TextAttribute),
67}
68
69pub struct ClientConfig {
81 pub stun_serv_addr: String, pub turn_serv_addr: String, pub local_addr: SocketAddr,
84 pub transport_protocol: TransportProtocol,
85 pub username: String,
86 pub password: String,
87 pub realm: String,
88 pub software: String,
89 pub rto_in_ms: u64,
90}
91
92pub struct Client {
94 stun_serv_addr: Option<SocketAddr>,
95 turn_serv_addr: Option<SocketAddr>,
96 local_addr: SocketAddr,
97 transport_protocol: TransportProtocol,
98 username: Username,
99 password: String,
100 realm: Realm,
101 integrity: MessageIntegrity,
102 software: Software,
103 tr_map: TransactionMap,
104 binding_mgr: BindingManager,
105 rto_in_ms: u64,
106
107 relays: HashMap<RelayedAddr, RelayState>,
108 transmits: VecDeque<TransportMessage<BytesMut>>,
109 events: VecDeque<Event>,
110}
111
112impl Client {
113 pub fn new(config: ClientConfig) -> Result<Self> {
115 let stun_serv_addr = if config.stun_serv_addr.is_empty() {
116 None
117 } else {
118 Some(lookup_host(
119 config.local_addr.is_ipv4(),
120 config.stun_serv_addr.as_str(),
121 )?)
122 };
123
124 let turn_serv_addr = if config.turn_serv_addr.is_empty() {
125 None
126 } else {
127 Some(lookup_host(
128 config.local_addr.is_ipv4(),
129 config.turn_serv_addr.as_str(),
130 )?)
131 };
132
133 Ok(Client {
134 stun_serv_addr,
135 turn_serv_addr,
136 local_addr: config.local_addr,
137 transport_protocol: config.transport_protocol,
138 username: Username::new(ATTR_USERNAME, config.username),
139 password: config.password,
140 realm: Realm::new(ATTR_REALM, config.realm),
141 software: Software::new(ATTR_SOFTWARE, config.software),
142 tr_map: TransactionMap::new(),
143 binding_mgr: BindingManager::new(),
144 rto_in_ms: if config.rto_in_ms != 0 {
145 config.rto_in_ms
146 } else {
147 DEFAULT_RTO_IN_MS
148 },
149 integrity: MessageIntegrity::new_short_term_integrity(String::new()),
150
151 relays: HashMap::new(),
152 transmits: VecDeque::new(),
153 events: VecDeque::new(),
154 })
155 }
156
157 fn handle_inbound(&mut self, data: &[u8], from: SocketAddr) -> Result<()> {
165 if is_stun_message(data) {
184 self.handle_stun_message(data)
185 } else if ChannelData::is_channel_data(data) {
186 self.handle_channel_data(data)
187 } else if self.stun_serv_addr.is_some() && &from == self.stun_serv_addr.as_ref().unwrap() {
188 Err(Error::ErrNonStunmessage)
190 } else {
191 trace!("non-STUN/TURN packet, unhandled");
193 Ok(())
194 }
195 }
196
197 fn handle_stun_message(&mut self, data: &[u8]) -> Result<()> {
198 let mut msg = Message::new();
199 msg.raw = data.to_vec();
200 msg.decode()?;
201
202 if msg.typ.class == CLASS_REQUEST {
203 return Err(Error::Other(format!(
204 "{:?} : {}",
205 Error::ErrUnexpectedStunrequestMessage,
206 msg
207 )));
208 }
209
210 if msg.typ.class == CLASS_INDICATION {
211 if msg.typ.method == METHOD_DATA {
212 let mut peer_addr = PeerAddress::default();
213 peer_addr.get_from(&msg)?;
214 let from = SocketAddr::new(peer_addr.ip, peer_addr.port);
215
216 let mut data = Data::default();
217 data.get_from(&msg)?;
218
219 debug!("data indication received from {}", from);
220
221 self.events.push_back(Event::DataIndicationOrChannelData(
222 None,
223 from,
224 BytesMut::from(&data.0[..]),
225 ))
226 }
227
228 return Ok(());
229 }
230
231 if self.tr_map.find(&msg.transaction_id).is_none() {
237 debug!("no transaction for {}", msg);
239 return Ok(());
240 }
241
242 if let Some(tr) = self.tr_map.delete(&msg.transaction_id) {
243 match msg.typ.method {
244 METHOD_BINDING => {
245 if msg.typ.class == CLASS_ERROR_RESPONSE {
246 let mut code = ErrorCodeAttribute::default();
247 let err = if code.get_from(&msg).is_err() {
248 Error::Other(format!("{}", msg.typ))
249 } else {
250 Error::Other(format!("{} (error {})", msg.typ, code))
251 };
252 self.events
253 .push_back(Event::BindingError(tr.transaction_id, err));
254 } else {
255 let mut refl_addr = XorMappedAddress::default();
256 match refl_addr.get_from(&msg) {
257 Ok(_) => {
258 self.events.push_back(Event::BindingResponse(
259 tr.transaction_id,
260 ReflexiveAddr::new(refl_addr.ip, refl_addr.port),
261 ));
262 }
263 Err(err) => {
264 self.events
265 .push_back(Event::BindingError(tr.transaction_id, err));
266 }
267 }
268 }
269 }
270 METHOD_ALLOCATE => {
271 self.handle_allocate_response(msg, tr.transaction_type)?;
272 }
273 METHOD_CREATE_PERMISSION => {
274 if let TransactionType::CreatePermissionRequest(relayed_addr, peer_addr) =
275 tr.transaction_type
276 {
277 let mut relay = Relay {
278 relayed_addr,
279 client: self,
280 };
281 relay.handle_create_permission_response(msg, peer_addr)?;
282 }
283 }
284 METHOD_REFRESH => {
285 if let TransactionType::RefreshRequest(relayed_addr) = tr.transaction_type {
286 let mut relay = Relay {
287 relayed_addr,
288 client: self,
289 };
290 relay.handle_refresh_allocation_response(msg)?;
291 }
292 }
293 METHOD_CHANNEL_BIND => {
294 if let TransactionType::ChannelBindRequest(relayed_addr, bind_addr) =
295 tr.transaction_type
296 {
297 let mut relay = Relay {
298 relayed_addr,
299 client: self,
300 };
301 relay.handle_channel_bind_response(msg, bind_addr)?;
302 }
303 }
304 _ => {}
305 }
306 }
307
308 Ok(())
309 }
310
311 fn handle_channel_data(&mut self, data: &[u8]) -> Result<()> {
312 let mut ch_data = ChannelData {
313 raw: data.to_vec(),
314 ..Default::default()
315 };
316 ch_data.decode()?;
317
318 let addr = self
319 .find_addr_by_channel_number(ch_data.number.0)
320 .ok_or(Error::ErrChannelBindNotFound)?;
321
322 trace!(
323 "channel data received from {} (ch={})",
324 addr, ch_data.number.0
325 );
326
327 self.events.push_back(Event::DataIndicationOrChannelData(
328 Some(ch_data.number),
329 addr,
330 BytesMut::from(&ch_data.data[..]),
331 ));
332
333 Ok(())
334 }
335
336 pub fn relay(&mut self, relayed_addr: SocketAddr) -> Result<Relay<'_>> {
337 if !self.relays.contains_key(&relayed_addr) {
338 Err(Error::ErrStreamNotExisted)
339 } else {
340 Ok(Relay {
341 relayed_addr,
342 client: self,
343 })
344 }
345 }
346
347 pub fn send_binding_request_to(&mut self, to: SocketAddr) -> Result<TransactionId> {
350 let msg = {
351 let attrs: Vec<Box<dyn Setter>> = if !self.software.text.is_empty() {
352 vec![
353 Box::new(TransactionId::new()),
354 Box::new(BINDING_REQUEST),
355 Box::new(self.software.clone()),
356 ]
357 } else {
358 vec![Box::new(TransactionId::new()), Box::new(BINDING_REQUEST)]
359 };
360
361 let mut msg = Message::new();
362 msg.build(&attrs)?;
363 msg
364 };
365
366 debug!("client.SendBindingRequestTo call PerformTransaction 1");
367 Ok(self.perform_transaction(&msg, to, TransactionType::BindingRequest))
368 }
369
370 pub fn send_binding_request(&mut self) -> Result<TransactionId> {
373 if let Some(stun_serv_addr) = &self.stun_serv_addr {
374 self.send_binding_request_to(*stun_serv_addr)
375 } else {
376 Err(Error::ErrStunserverAddressNotSet)
377 }
378 }
379
380 fn find_addr_by_channel_number(&self, ch_num: u16) -> Option<SocketAddr> {
383 self.binding_mgr.find_by_number(ch_num).map(|b| b.addr)
384 }
385
386 fn stun_server_addr(&self) -> Option<SocketAddr> {
388 self.stun_serv_addr
389 }
390
391 pub fn allocate(&mut self) -> Result<TransactionId> {
434 let mut msg = Message::new();
435 msg.build(&[
436 Box::new(TransactionId::new()),
437 Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
438 Box::new(RequestedTransport {
439 protocol: if self.transport_protocol == TransportProtocol::UDP {
440 PROTO_UDP
441 } else {
442 PROTO_TCP
443 },
444 }),
445 Box::new(FINGERPRINT),
446 ])?;
447
448 debug!("client.Allocate call PerformTransaction 1");
449 let mut tid = self.perform_transaction(
450 &msg,
451 self.turn_server_addr()?,
452 TransactionType::AllocateAttempt,
453 );
454 tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1);
455 Ok(tid)
456 }
457
458 fn handle_allocate_response(
459 &mut self,
460 response: Message,
461 allocate_state: TransactionType,
462 ) -> Result<()> {
463 match allocate_state {
464 TransactionType::AllocateAttempt => {
465 let nonce = match Nonce::get_from_as(&response, ATTR_NONCE) {
467 Ok(nonce) => nonce,
468 Err(err) => {
469 self.events
470 .push_back(Event::AllocateError(response.transaction_id, err));
471 return Ok(());
472 }
473 };
474 self.realm = match Realm::get_from_as(&response, ATTR_REALM) {
475 Ok(realm) => realm,
476 Err(err) => {
477 self.events
478 .push_back(Event::AllocateError(response.transaction_id, err));
479 return Ok(());
480 }
481 };
482
483 self.integrity = MessageIntegrity::new_long_term_integrity(
484 self.username.text.clone(),
485 self.realm.text.clone(),
486 self.password.clone(),
487 );
488
489 let mut msg = Message::new();
490
491 let mut tid = response.transaction_id;
494 tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1);
495
496 msg.build(&[
498 Box::new(tid),
499 Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
500 Box::new(RequestedTransport {
501 protocol: if self.transport_protocol == TransportProtocol::UDP {
502 PROTO_UDP
503 } else {
504 PROTO_TCP
505 },
506 }),
507 Box::new(self.username.clone()),
508 Box::new(self.realm.clone()),
509 Box::new(nonce.clone()),
510 Box::new(self.integrity.clone()),
511 Box::new(FINGERPRINT),
512 ])?;
513
514 debug!("client.Allocate call PerformTransaction 2");
515 self.perform_transaction(
516 &msg,
517 self.turn_server_addr()?,
518 TransactionType::AllocateRequest(nonce),
519 );
520 }
521 TransactionType::AllocateRequest(nonce) => {
522 if response.typ.class == CLASS_ERROR_RESPONSE {
523 let mut code = ErrorCodeAttribute::default();
524 let err = if code.get_from(&response).is_err() {
525 Error::Other(format!("{}", response.typ))
526 } else {
527 Error::Other(format!("{} (error {})", response.typ, code))
528 };
529 self.events
530 .push_back(Event::AllocateError(response.transaction_id, err));
531 return Ok(());
532 }
533
534 let mut relayed = RelayedAddress::default();
536 relayed.get_from(&response)?;
537 let relayed_addr = RelayedAddr::new(relayed.ip, relayed.port);
538
539 let mut lifetime = Lifetime::default();
541 lifetime.get_from(&response)?;
542
543 self.relays.insert(
544 relayed_addr,
545 RelayState::new(relayed_addr, self.integrity.clone(), nonce, lifetime.0),
546 );
547 self.events.push_back(Event::AllocateResponse(
548 response.transaction_id,
549 relayed_addr,
550 ));
551 }
552 _ => {}
553 }
554 Ok(())
555 }
556
557 fn turn_server_addr(&self) -> Result<SocketAddr> {
559 self.turn_serv_addr.ok_or(Error::ErrNilTurnSocket)
560 }
561
562 fn username(&self) -> Username {
564 self.username.clone()
565 }
566
567 fn realm(&self) -> Realm {
569 self.realm.clone()
570 }
571
572 fn write_to(&mut self, data: &[u8], remote: SocketAddr) {
574 self.transmits.push_back(TransportMessage {
575 now: Instant::now(),
576 transport: TransportContext {
577 local_addr: self.local_addr,
578 peer_addr: remote,
579 transport_protocol: self.transport_protocol,
580 ecn: None,
581 },
582 message: BytesMut::from(data),
583 });
584 }
585
586 fn perform_transaction(
588 &mut self,
589 msg: &Message,
590 to: SocketAddr,
591 transaction_type: TransactionType,
592 ) -> TransactionId {
593 let tr = Transaction::new(TransactionConfig {
594 transaction_id: msg.transaction_id,
595 transaction_type,
596 raw: BytesMut::from(&msg.raw[..]),
597 local_addr: self.local_addr,
598 peer_addr: to,
599 transport_protocol: self.transport_protocol,
600 interval: self.rto_in_ms,
601 });
602
603 trace!(
604 "start {} transaction {:?} to {}",
605 msg.typ, msg.transaction_id, tr.peer_addr
606 );
607 self.tr_map.insert(msg.transaction_id, tr);
608
609 self.write_to(&msg.raw, to);
610
611 msg.transaction_id
612 }
613}