1#[cfg(test)]
2mod client_test;
3
4pub mod binding;
5pub mod periodic_timer;
6pub mod permission;
7pub mod relay_conn;
8pub mod transaction;
9
10use crate::errors::*;
11use crate::proto::{
12 chandata::*, data::*, lifetime::*, peeraddr::*, relayaddr::*, reqtrans::*, PROTO_UDP,
13};
14use binding::*;
15use relay_conn::*;
16use transaction::*;
17
18use stun::agent::*;
19use stun::attributes::*;
20use stun::error_code::*;
21use stun::fingerprint::*;
22use stun::integrity::*;
23use stun::message::*;
24use stun::textattrs::*;
25use stun::xoraddr::*;
26
27use std::sync::Arc;
28
29use std::net::SocketAddr;
30use std::str::FromStr;
31use tokio::sync::{mpsc, Mutex};
32use util::{conn::*, vnet::net::*, Error};
33
34use async_trait::async_trait;
35
36const DEFAULT_RTO_IN_MS: u16 = 200;
37const MAX_DATA_BUFFER_SIZE: usize = u16::MAX as usize; const MAX_READ_QUEUE_SIZE: usize = 1024;
39
40pub struct ClientConfig {
52 pub stun_serv_addr: String, pub turn_serv_addr: String, pub username: String,
55 pub password: String,
56 pub realm: String,
57 pub software: String,
58 pub rto_in_ms: u16,
59 pub conn: Arc<dyn Conn + Send + Sync>,
60 pub vnet: Option<Arc<Net>>,
61}
62
63struct ClientInternal {
64 conn: Arc<dyn Conn + Send + Sync>,
65 stun_serv_addr: String,
66 turn_serv_addr: String,
67 username: Username,
68 password: String,
69 realm: Realm,
70 integrity: MessageIntegrity,
71 software: Software,
72 tr_map: Arc<Mutex<TransactionMap>>,
73 binding_mgr: Arc<Mutex<BindingManager>>,
74 rto_in_ms: u16,
75 read_ch_tx: Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
76}
77
78#[async_trait]
79impl RelayConnObserver for ClientInternal {
80 fn turn_server_addr(&self) -> String {
82 self.turn_serv_addr.clone()
83 }
84
85 fn username(&self) -> Username {
87 self.username.clone()
88 }
89
90 fn realm(&self) -> Realm {
92 self.realm.clone()
93 }
94
95 async fn write_to(&self, data: &[u8], to: &str) -> Result<usize, Error> {
97 let n = self.conn.send_to(data, SocketAddr::from_str(to)?).await?;
98 Ok(n)
99 }
100
101 async fn perform_transaction(
103 &mut self,
104 msg: &Message,
105 to: &str,
106 ignore_result: bool,
107 ) -> Result<TransactionResult, Error> {
108 let tr_key = base64::encode(&msg.transaction_id.0);
109
110 let mut tr = Transaction::new(TransactionConfig {
111 key: tr_key.clone(),
112 raw: msg.raw.clone(),
113 to: to.to_string(),
114 interval: self.rto_in_ms,
115 ignore_result,
116 });
117 let result_ch_rx = tr.get_result_channel();
118
119 log::trace!("start {} transaction {} to {}", msg.typ, tr_key, tr.to);
120 {
121 let mut tm = self.tr_map.lock().await;
122 tm.insert(tr_key.clone(), tr);
123 }
124
125 self.conn
126 .send_to(&msg.raw, SocketAddr::from_str(to)?)
127 .await?;
128
129 let conn2 = Arc::clone(&self.conn);
130 let tr_map2 = Arc::clone(&self.tr_map);
131 {
132 let mut tm = self.tr_map.lock().await;
133 if let Some(tr) = tm.get(&tr_key) {
134 tr.start_rtx_timer(conn2, tr_map2).await;
135 }
136 }
137
138 if ignore_result {
140 return Ok(TransactionResult::default());
141 }
142
143 if let Some(mut result_ch_rx) = result_ch_rx {
145 match result_ch_rx.recv().await {
146 Some(tr) => Ok(tr),
147 None => Err(ERR_TRANSACTION_CLOSED.to_owned()),
148 }
149 } else {
150 Err(ERR_WAIT_FOR_RESULT_ON_NON_RESULT_TRANSACTION.to_owned())
151 }
152 }
153}
154
155impl ClientInternal {
156 async fn new(config: ClientConfig) -> Result<Self, Error> {
158 let net = if let Some(vnet) = config.vnet {
159 if vnet.is_virtual() {
160 log::warn!("vnet is enabled");
161 }
162 vnet
163 } else {
164 Arc::new(Net::new(None))
165 };
166
167 let stun_serv_addr = if config.stun_serv_addr.is_empty() {
168 String::new()
169 } else {
170 log::debug!("resolving {}", config.stun_serv_addr);
171 let local_addr = config.conn.local_addr().await?;
172 let stun_serv = net
173 .resolve_addr(local_addr.is_ipv4(), &config.stun_serv_addr)
174 .await?;
175 log::debug!("stunServ: {}", stun_serv);
176 stun_serv.to_string()
177 };
178
179 let turn_serv_addr = if config.turn_serv_addr.is_empty() {
180 String::new()
181 } else {
182 log::debug!("resolving {}", config.turn_serv_addr);
183 let local_addr = config.conn.local_addr().await?;
184 let turn_serv = net
185 .resolve_addr(local_addr.is_ipv4(), &config.turn_serv_addr)
186 .await?;
187 log::debug!("turnServ: {}", turn_serv);
188 turn_serv.to_string()
189 };
190
191 Ok(ClientInternal {
192 conn: Arc::clone(&config.conn),
193 stun_serv_addr,
194 turn_serv_addr,
195 username: Username::new(ATTR_USERNAME, config.username),
196 password: config.password,
197 realm: Realm::new(ATTR_REALM, config.realm),
198 software: Software::new(ATTR_SOFTWARE, config.software),
199 tr_map: Arc::new(Mutex::new(TransactionMap::new())),
200 binding_mgr: Arc::new(Mutex::new(BindingManager::new())),
201 rto_in_ms: if config.rto_in_ms != 0 {
202 config.rto_in_ms
203 } else {
204 DEFAULT_RTO_IN_MS
205 },
206 integrity: MessageIntegrity::new_short_term_integrity(String::new()),
207 read_ch_tx: Arc::new(Mutex::new(None)),
208 })
209 }
210
211 fn stun_server_addr(&self) -> String {
213 self.stun_serv_addr.clone()
214 }
215
216 async fn listen(&self) -> Result<(), Error> {
220 let conn = Arc::clone(&self.conn);
221 let stun_serv_str = self.stun_serv_addr.clone();
222 let tr_map = Arc::clone(&self.tr_map);
223 let read_ch_tx = Arc::clone(&self.read_ch_tx);
224 let binding_mgr = Arc::clone(&self.binding_mgr);
225
226 tokio::spawn(async move {
227 let mut buf = vec![0u8; MAX_DATA_BUFFER_SIZE];
228 loop {
229 let (n, from) = match conn.recv_from(&mut buf).await {
231 Ok((n, from)) => (n, from),
232 Err(err) => {
233 log::debug!("exiting read loop: {}", err);
234 break;
235 }
236 };
237
238 log::debug!("received {} bytes of udp from {}", n, from);
239
240 if let Err(err) = ClientInternal::handle_inbound(
241 &read_ch_tx,
242 &buf[..n],
243 from,
244 &stun_serv_str,
245 &tr_map,
246 &binding_mgr,
247 )
248 .await
249 {
250 log::debug!("exiting read loop: {}", err);
251 break;
252 }
253 }
254 });
255
256 Ok(())
257 }
258
259 async fn handle_inbound(
267 read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
268 data: &[u8],
269 from: SocketAddr,
270 stun_serv_str: &str,
271 tr_map: &Arc<Mutex<TransactionMap>>,
272 binding_mgr: &Arc<Mutex<BindingManager>>,
273 ) -> Result<(), Error> {
274 if is_message(data) {
293 ClientInternal::handle_stun_message(tr_map, read_ch_tx, data, from).await
294 } else if ChannelData::is_channel_data(data) {
295 ClientInternal::handle_channel_data(binding_mgr, read_ch_tx, data).await
296 } else if !stun_serv_str.is_empty() && from.to_string() == *stun_serv_str {
297 Err(ERR_NON_STUNMESSAGE.to_owned())
299 } else {
300 log::trace!("non-STUN/TURN packect, unhandled");
302 Ok(())
303 }
304 }
305
306 async fn handle_stun_message(
307 tr_map: &Arc<Mutex<TransactionMap>>,
308 read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
309 data: &[u8],
310 mut from: SocketAddr,
311 ) -> Result<(), Error> {
312 let mut msg = Message::new();
313 msg.raw = data.to_vec();
314 msg.decode()?;
315
316 if msg.typ.class == CLASS_REQUEST {
317 return Err(Error::new(format!(
318 "{} : {}",
319 *ERR_UNEXPECTED_STUNREQUEST_MESSAGE, msg
320 )));
321 }
322
323 if msg.typ.class == CLASS_INDICATION {
324 if msg.typ.method == METHOD_DATA {
325 let mut peer_addr = PeerAddress::default();
326 peer_addr.get_from(&msg)?;
327 from = SocketAddr::new(peer_addr.ip, peer_addr.port);
328
329 let mut data = Data::default();
330 data.get_from(&msg)?;
331
332 log::debug!("data indication received from {}", from);
333
334 let _ = ClientInternal::handle_inbound_relay_conn(read_ch_tx, &data.0, from).await;
335 }
336
337 return Ok(());
338 }
339
340 let tr_key = base64::encode(&msg.transaction_id.0);
346
347 let mut tm = tr_map.lock().await;
348 if tm.find(&tr_key).is_none() {
349 log::debug!("no transaction for {}", msg);
351 return Ok(());
352 }
353
354 if let Some(mut tr) = tm.delete(&tr_key) {
355 tr.stop_rtx_timer();
357
358 if !tr
359 .write_result(TransactionResult {
360 msg,
361 from,
362 retries: tr.retries(),
363 ..Default::default()
364 })
365 .await
366 {
367 log::debug!("no listener for msg.raw {:?}", data);
368 }
369 }
370
371 Ok(())
372 }
373
374 async fn handle_channel_data(
375 binding_mgr: &Arc<Mutex<BindingManager>>,
376 read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
377 data: &[u8],
378 ) -> Result<(), Error> {
379 let mut ch_data = ChannelData {
380 raw: data.to_vec(),
381 ..Default::default()
382 };
383 ch_data.decode()?;
384
385 let addr = ClientInternal::find_addr_by_channel_number(binding_mgr, ch_data.number.0)
386 .await
387 .ok_or_else(|| ERR_CHANNEL_BIND_NOT_FOUND.to_owned())?;
388
389 log::trace!(
390 "channel data received from {} (ch={})",
391 addr,
392 ch_data.number.0
393 );
394
395 let _ = ClientInternal::handle_inbound_relay_conn(read_ch_tx, &ch_data.data, addr).await;
396
397 Ok(())
398 }
399
400 async fn handle_inbound_relay_conn(
402 read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
403 data: &[u8],
404 from: SocketAddr,
405 ) -> Result<(), Error> {
406 let read_ch_tx_opt = read_ch_tx.lock().await;
407 log::debug!("read_ch_tx_opt = {}", read_ch_tx_opt.is_some());
408 if let Some(tx) = &*read_ch_tx_opt {
409 log::debug!("try_send data = {:?}, from = {}", data, from);
410 if tx
411 .try_send(InboundData {
412 data: data.to_vec(),
413 from,
414 })
415 .is_err()
416 {
417 log::warn!("receive buffer full");
418 }
419 Ok(())
420 } else {
421 Err(ERR_ALREADY_CLOSED.to_owned())
422 }
423 }
424
425 async fn close(&mut self) {
427 {
428 let mut read_ch_tx = self.read_ch_tx.lock().await;
429 read_ch_tx.take();
430 }
431 {
432 let mut tm = self.tr_map.lock().await;
433 tm.close_and_delete_all();
434 }
435 }
436
437 async fn send_binding_request_to(&mut self, to: &str) -> Result<SocketAddr, Error> {
439 let msg = {
440 let attrs: Vec<Box<dyn Setter>> = if !self.software.text.is_empty() {
441 vec![
442 Box::new(TransactionId::new()),
443 Box::new(BINDING_REQUEST),
444 Box::new(self.software.clone()),
445 ]
446 } else {
447 vec![Box::new(TransactionId::new()), Box::new(BINDING_REQUEST)]
448 };
449
450 let mut msg = Message::new();
451 msg.build(&attrs)?;
452 msg
453 };
454
455 log::debug!("client.SendBindingRequestTo call PerformTransaction 1");
456 let tr_res = self.perform_transaction(&msg, to, false).await?;
457
458 let mut refl_addr = XORMappedAddress::default();
459 refl_addr.get_from(&tr_res.msg)?;
460
461 Ok(SocketAddr::new(refl_addr.ip, refl_addr.port))
462 }
463
464 async fn send_binding_request(&mut self) -> Result<SocketAddr, Error> {
466 if self.stun_serv_addr.is_empty() {
467 Err(ERR_STUNSERVER_ADDRESS_NOT_SET.to_owned())
468 } else {
469 self.send_binding_request_to(&self.stun_serv_addr.clone())
470 .await
471 }
472 }
473
474 async fn find_addr_by_channel_number(
477 binding_mgr: &Arc<Mutex<BindingManager>>,
478 ch_num: u16,
479 ) -> Option<SocketAddr> {
480 let bm = binding_mgr.lock().await;
481 if let Some(b) = bm.find_by_number(ch_num) {
482 Some(b.addr)
483 } else {
484 None
485 }
486 }
487
488 async fn allocate(&mut self) -> Result<RelayConnConfig, Error> {
490 {
491 let read_ch_tx = self.read_ch_tx.lock().await;
492 log::debug!("allocate check: read_ch_tx_opt = {}", read_ch_tx.is_some());
493 if read_ch_tx.is_some() {
494 return Err(ERR_ONE_ALLOCATE_ONLY.to_owned());
495 }
496 }
497
498 let mut msg = Message::new();
499 msg.build(&[
500 Box::new(TransactionId::new()),
501 Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
502 Box::new(RequestedTransport {
503 protocol: PROTO_UDP,
504 }),
505 Box::new(FINGERPRINT),
506 ])?;
507
508 log::debug!("client.Allocate call PerformTransaction 1");
509 let tr_res = self
510 .perform_transaction(&msg, &self.turn_serv_addr.clone(), false)
511 .await?;
512 let res = tr_res.msg;
513
514 let nonce = Nonce::get_from_as(&res, ATTR_NONCE)?;
516 self.realm = Realm::get_from_as(&res, ATTR_REALM)?;
517
518 self.integrity = MessageIntegrity::new_long_term_integrity(
519 self.username.text.clone(),
520 self.realm.text.clone(),
521 self.password.clone(),
522 );
523
524 msg.build(&[
526 Box::new(TransactionId::new()),
527 Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
528 Box::new(RequestedTransport {
529 protocol: PROTO_UDP,
530 }),
531 Box::new(self.username.clone()),
532 Box::new(self.realm.clone()),
533 Box::new(nonce.clone()),
534 Box::new(self.integrity.clone()),
535 Box::new(FINGERPRINT),
536 ])?;
537
538 log::debug!("client.Allocate call PerformTransaction 2");
539 let tr_res = self
540 .perform_transaction(&msg, &self.turn_serv_addr.clone(), false)
541 .await?;
542 let res = tr_res.msg;
543
544 if res.typ.class == CLASS_ERROR_RESPONSE {
545 let mut code = ErrorCodeAttribute::default();
546 let result = code.get_from(&res);
547 if result.is_err() {
548 return Err(Error::new(format!("{}", res.typ)));
549 } else {
550 return Err(Error::new(format!("{} (error {})", res.typ, code)));
551 }
552 }
553
554 let mut relayed = RelayedAddress::default();
556 relayed.get_from(&res)?;
557 let relayed_addr = SocketAddr::new(relayed.ip, relayed.port);
558
559 let mut lifetime = Lifetime::default();
561 lifetime.get_from(&res)?;
562
563 let (read_ch_tx, read_ch_rx) = mpsc::channel(MAX_READ_QUEUE_SIZE);
564 {
565 let mut read_ch_tx_opt = self.read_ch_tx.lock().await;
566 *read_ch_tx_opt = Some(read_ch_tx);
567 log::debug!("allocate: read_ch_tx_opt = {}", read_ch_tx_opt.is_some());
568 }
569
570 Ok(RelayConnConfig {
571 relayed_addr,
572 integrity: self.integrity.clone(),
573 nonce,
574 lifetime: lifetime.0,
575 binding_mgr: Arc::clone(&self.binding_mgr),
576 read_ch_rx: Arc::new(Mutex::new(read_ch_rx)),
577 })
578 }
579}
580
581#[derive(Clone)]
583pub struct Client {
584 client_internal: Arc<Mutex<ClientInternal>>,
585}
586
587impl Client {
588 pub async fn new(config: ClientConfig) -> Result<Self, Error> {
589 let ci = ClientInternal::new(config).await?;
590 Ok(Client {
591 client_internal: Arc::new(Mutex::new(ci)),
592 })
593 }
594
595 pub async fn listen(&self) -> Result<(), Error> {
596 let ci = self.client_internal.lock().await;
597 ci.listen().await
598 }
599
600 pub async fn allocate(&self) -> Result<impl Conn, Error> {
601 let config = {
602 let mut ci = self.client_internal.lock().await;
603 ci.allocate().await?
604 };
605
606 Ok(RelayConn::new(Arc::clone(&self.client_internal), config))
607 }
608
609 pub async fn close(&self) -> Result<(), Error> {
610 let mut ci = self.client_internal.lock().await;
611 ci.close().await;
612 Ok(())
613 }
614
615 pub async fn send_binding_request_to(&self, to: &str) -> Result<SocketAddr, Error> {
617 let mut ci = self.client_internal.lock().await;
618 ci.send_binding_request_to(to).await
619 }
620
621 pub async fn send_binding_request(&self) -> Result<SocketAddr, Error> {
623 let mut ci = self.client_internal.lock().await;
624 ci.send_binding_request().await
625 }
626}