1#[cfg(test)]
2mod client_test;
3
4pub mod binding;
5pub mod periodic_timer;
6pub mod permission;
7pub mod relay_conn;
8pub mod transaction;
9
10use std::net::SocketAddr;
11use std::str::FromStr;
12use std::sync::Arc;
13
14use async_trait::async_trait;
15use base64::prelude::BASE64_STANDARD;
16use base64::Engine;
17use binding::*;
18use relay_conn::*;
19use stun::agent::*;
20use stun::attributes::*;
21use stun::error_code::*;
22use stun::fingerprint::*;
23use stun::integrity::*;
24use stun::message::*;
25use stun::textattrs::*;
26use stun::xoraddr::*;
27use tokio::pin;
28use tokio::select;
29use tokio::sync::{mpsc, Mutex};
30use tokio_util::sync::CancellationToken;
31use transaction::*;
32use util::conn::*;
33use util::vnet::net::*;
34
35use crate::error::*;
36use crate::proto::chandata::*;
37use crate::proto::data::*;
38use crate::proto::lifetime::*;
39use crate::proto::peeraddr::*;
40use crate::proto::relayaddr::*;
41use crate::proto::reqtrans::*;
42use crate::proto::PROTO_UDP;
43
44const DEFAULT_RTO_IN_MS: u16 = 200;
45const MAX_DATA_BUFFER_SIZE: usize = u16::MAX as usize; const MAX_READ_QUEUE_SIZE: usize = 1024;
47
48pub struct ClientConfig {
60 pub stun_serv_addr: String, pub turn_serv_addr: String, pub username: String,
63 pub password: String,
64 pub realm: String,
65 pub software: String,
66 pub rto_in_ms: u16,
67 pub conn: Arc<dyn Conn + Send + Sync>,
68 pub vnet: Option<Arc<Net>>,
69}
70
71struct ClientInternal {
72 conn: Arc<dyn Conn + Send + Sync>,
73 stun_serv_addr: String,
74 turn_serv_addr: String,
75 username: Username,
76 password: String,
77 realm: Realm,
78 integrity: MessageIntegrity,
79 software: Software,
80 tr_map: Arc<Mutex<TransactionMap>>,
81 binding_mgr: Arc<Mutex<BindingManager>>,
82 rto_in_ms: u16,
83 read_ch_tx: Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
84 close_notify: CancellationToken,
85}
86
87#[async_trait]
88impl RelayConnObserver for ClientInternal {
89 fn turn_server_addr(&self) -> String {
91 self.turn_serv_addr.clone()
92 }
93
94 fn username(&self) -> Username {
96 self.username.clone()
97 }
98
99 fn realm(&self) -> Realm {
101 self.realm.clone()
102 }
103
104 async fn write_to(&self, data: &[u8], to: &str) -> std::result::Result<usize, util::Error> {
106 let n = self.conn.send_to(data, SocketAddr::from_str(to)?).await?;
107 Ok(n)
108 }
109
110 async fn perform_transaction(
112 &mut self,
113 msg: &Message,
114 to: &str,
115 ignore_result: bool,
116 ) -> Result<TransactionResult> {
117 let tr_key = BASE64_STANDARD.encode(msg.transaction_id.0);
118
119 let mut tr = Transaction::new(TransactionConfig {
120 key: tr_key.clone(),
121 raw: msg.raw.clone(),
122 to: to.to_string(),
123 interval: self.rto_in_ms,
124 ignore_result,
125 });
126 let result_ch_rx = tr.get_result_channel();
127
128 log::trace!("start {} transaction {} to {}", msg.typ, tr_key, tr.to);
129 {
130 let mut tm = self.tr_map.lock().await;
131 tm.insert(tr_key.clone(), tr);
132 }
133
134 self.conn
135 .send_to(&msg.raw, SocketAddr::from_str(to)?)
136 .await?;
137
138 let conn2 = Arc::clone(&self.conn);
139 let tr_map2 = Arc::clone(&self.tr_map);
140 {
141 let mut tm = self.tr_map.lock().await;
142 if let Some(tr) = tm.get(&tr_key) {
143 tr.start_rtx_timer(conn2, tr_map2).await;
144 }
145 }
146
147 if ignore_result {
149 return Ok(TransactionResult::default());
150 }
151
152 if let Some(mut result_ch_rx) = result_ch_rx {
154 match result_ch_rx.recv().await {
155 Some(tr) => Ok(tr),
156 None => Err(Error::ErrTransactionClosed),
157 }
158 } else {
159 Err(Error::ErrWaitForResultOnNonResultTransaction)
160 }
161 }
162}
163
164impl ClientInternal {
165 async fn new(config: ClientConfig) -> Result<Self> {
167 let net = if let Some(vnet) = config.vnet {
168 if vnet.is_virtual() {
169 log::warn!("vnet is enabled");
170 }
171 vnet
172 } else {
173 Arc::new(Net::new(None))
174 };
175
176 let stun_serv_addr = if config.stun_serv_addr.is_empty() {
177 String::new()
178 } else {
179 log::debug!("resolving {}", config.stun_serv_addr);
180 let local_addr = config.conn.local_addr()?;
181 let stun_serv = net
182 .resolve_addr(local_addr.is_ipv4(), &config.stun_serv_addr)
183 .await?;
184 log::debug!("stunServ: {}", stun_serv);
185 stun_serv.to_string()
186 };
187
188 let turn_serv_addr = if config.turn_serv_addr.is_empty() {
189 String::new()
190 } else {
191 log::debug!("resolving {}", config.turn_serv_addr);
192 let local_addr = config.conn.local_addr()?;
193 let turn_serv = net
194 .resolve_addr(local_addr.is_ipv4(), &config.turn_serv_addr)
195 .await?;
196 log::debug!("turnServ: {}", turn_serv);
197 turn_serv.to_string()
198 };
199
200 Ok(ClientInternal {
201 conn: Arc::clone(&config.conn),
202 stun_serv_addr,
203 turn_serv_addr,
204 username: Username::new(ATTR_USERNAME, config.username),
205 password: config.password,
206 realm: Realm::new(ATTR_REALM, config.realm),
207 software: Software::new(ATTR_SOFTWARE, config.software),
208 tr_map: Arc::new(Mutex::new(TransactionMap::new())),
209 binding_mgr: Arc::new(Mutex::new(BindingManager::new())),
210 rto_in_ms: if config.rto_in_ms != 0 {
211 config.rto_in_ms
212 } else {
213 DEFAULT_RTO_IN_MS
214 },
215 integrity: MessageIntegrity::new_short_term_integrity(String::new()),
216 read_ch_tx: Arc::new(Mutex::new(None)),
217 close_notify: CancellationToken::new(),
218 })
219 }
220
221 fn stun_server_addr(&self) -> String {
223 self.stun_serv_addr.clone()
224 }
225
226 async fn listen(&self) -> Result<()> {
230 let conn = Arc::clone(&self.conn);
231 let stun_serv_str = self.stun_serv_addr.clone();
232 let tr_map = Arc::clone(&self.tr_map);
233 let read_ch_tx = Arc::clone(&self.read_ch_tx);
234 let binding_mgr = Arc::clone(&self.binding_mgr);
235 let close_notify = self.close_notify.clone();
236
237 tokio::spawn(async move {
238 let mut buf = vec![0u8; MAX_DATA_BUFFER_SIZE];
239 let wait_cancel = close_notify.cancelled();
240 pin!(wait_cancel);
241
242 loop {
243 let (n, from) = select! {
244 biased;
245
246 _ = &mut wait_cancel => {
247 log::debug!("exiting read loop");
248 break;
249 },
250 result = conn.recv_from(&mut buf) => match result {
251 Ok((n, from)) => (n, from),
252 Err(err) => {
253 log::debug!("exiting read loop: {}", err);
254 break;
255 }
256 }
257 };
258 log::debug!("received {} bytes of udp from {}", n, from);
259
260 select! {
261 biased;
262
263 _ = &mut wait_cancel => {
264 log::debug!("exiting read loop");
265 break;
266 },
267 result = ClientInternal::handle_inbound(
268 &read_ch_tx,
269 &buf[..n],
270 from,
271 &stun_serv_str,
272 &tr_map,
273 &binding_mgr,
274 ) => {
275 if let Err(err) = result {
276 log::debug!("exiting read loop: {}", err);
277 break;
278 }
279 }
280 }
281 }
282 });
283
284 Ok(())
285 }
286
287 async fn handle_inbound(
295 read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
296 data: &[u8],
297 from: SocketAddr,
298 stun_serv_str: &str,
299 tr_map: &Arc<Mutex<TransactionMap>>,
300 binding_mgr: &Arc<Mutex<BindingManager>>,
301 ) -> Result<()> {
302 if is_message(data) {
321 ClientInternal::handle_stun_message(tr_map, read_ch_tx, data, from).await
322 } else if ChannelData::is_channel_data(data) {
323 ClientInternal::handle_channel_data(binding_mgr, read_ch_tx, data).await
324 } else if !stun_serv_str.is_empty() && from.to_string() == *stun_serv_str {
325 Err(Error::ErrNonStunmessage)
327 } else {
328 log::trace!("non-STUN/TURN packect, unhandled");
330 Ok(())
331 }
332 }
333
334 async fn handle_stun_message(
335 tr_map: &Arc<Mutex<TransactionMap>>,
336 read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
337 data: &[u8],
338 mut from: SocketAddr,
339 ) -> Result<()> {
340 let mut msg = Message::new();
341 msg.raw = data.to_vec();
342 msg.decode()?;
343
344 if msg.typ.class == CLASS_REQUEST {
345 return Err(Error::Other(format!(
346 "{:?} : {}",
347 Error::ErrUnexpectedStunrequestMessage,
348 msg
349 )));
350 }
351
352 if msg.typ.class == CLASS_INDICATION {
353 if msg.typ.method == METHOD_DATA {
354 let mut peer_addr = PeerAddress::default();
355 peer_addr.get_from(&msg)?;
356 from = SocketAddr::new(peer_addr.ip, peer_addr.port);
357
358 let mut data = Data::default();
359 data.get_from(&msg)?;
360
361 log::debug!("data indication received from {}", from);
362
363 let _ = ClientInternal::handle_inbound_relay_conn(read_ch_tx, &data.0, from).await;
364 }
365
366 return Ok(());
367 }
368
369 let tr_key = BASE64_STANDARD.encode(msg.transaction_id.0);
375
376 let mut tm = tr_map.lock().await;
377 if tm.find(&tr_key).is_none() {
378 log::debug!("no transaction for {}", msg);
380 return Ok(());
381 }
382
383 if let Some(mut tr) = tm.delete(&tr_key) {
384 tr.stop_rtx_timer();
386
387 if !tr
388 .write_result(TransactionResult {
389 msg,
390 from,
391 retries: tr.retries(),
392 ..Default::default()
393 })
394 .await
395 {
396 log::debug!("no listener for msg.raw {:?}", data);
397 }
398 }
399
400 Ok(())
401 }
402
403 async fn handle_channel_data(
404 binding_mgr: &Arc<Mutex<BindingManager>>,
405 read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
406 data: &[u8],
407 ) -> Result<()> {
408 let mut ch_data = ChannelData {
409 raw: data.to_vec(),
410 ..Default::default()
411 };
412 ch_data.decode()?;
413
414 let addr = ClientInternal::find_addr_by_channel_number(binding_mgr, ch_data.number.0)
415 .await
416 .ok_or(Error::ErrChannelBindNotFound)?;
417
418 log::trace!(
419 "channel data received from {} (ch={})",
420 addr,
421 ch_data.number.0
422 );
423
424 let _ = ClientInternal::handle_inbound_relay_conn(read_ch_tx, &ch_data.data, addr).await;
425
426 Ok(())
427 }
428
429 async fn handle_inbound_relay_conn(
431 read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
432 data: &[u8],
433 from: SocketAddr,
434 ) -> Result<()> {
435 let read_ch_tx_opt = read_ch_tx.lock().await;
436 log::debug!("read_ch_tx_opt = {}", read_ch_tx_opt.is_some());
437 if let Some(tx) = &*read_ch_tx_opt {
438 log::debug!("try_send data = {:?}, from = {}", data, from);
439 if tx
440 .try_send(InboundData {
441 data: data.to_vec(),
442 from,
443 })
444 .is_err()
445 {
446 log::warn!("receive buffer full");
447 }
448 Ok(())
449 } else {
450 Err(Error::ErrAlreadyClosed)
451 }
452 }
453
454 async fn close(&mut self) {
456 self.close_notify.cancel();
457 {
458 let mut read_ch_tx = self.read_ch_tx.lock().await;
459 read_ch_tx.take();
460 }
461 {
462 let mut tm = self.tr_map.lock().await;
463 tm.close_and_delete_all();
464 }
465 }
466
467 async fn send_binding_request_to(&mut self, to: &str) -> Result<SocketAddr> {
469 let msg = {
470 let attrs: Vec<Box<dyn Setter>> = if !self.software.text.is_empty() {
471 vec![
472 Box::new(TransactionId::new()),
473 Box::new(BINDING_REQUEST),
474 Box::new(self.software.clone()),
475 ]
476 } else {
477 vec![Box::new(TransactionId::new()), Box::new(BINDING_REQUEST)]
478 };
479
480 let mut msg = Message::new();
481 msg.build(&attrs)?;
482 msg
483 };
484
485 log::debug!("client.SendBindingRequestTo call PerformTransaction 1");
486 let tr_res = self.perform_transaction(&msg, to, false).await?;
487
488 let mut refl_addr = XorMappedAddress::default();
489 refl_addr.get_from(&tr_res.msg)?;
490
491 Ok(SocketAddr::new(refl_addr.ip, refl_addr.port))
492 }
493
494 async fn send_binding_request(&mut self) -> Result<SocketAddr> {
496 if self.stun_serv_addr.is_empty() {
497 Err(Error::ErrStunserverAddressNotSet)
498 } else {
499 self.send_binding_request_to(&self.stun_serv_addr.clone())
500 .await
501 }
502 }
503
504 async fn find_addr_by_channel_number(
507 binding_mgr: &Arc<Mutex<BindingManager>>,
508 ch_num: u16,
509 ) -> Option<SocketAddr> {
510 let bm = binding_mgr.lock().await;
511 bm.find_by_number(ch_num).map(|b| b.addr)
512 }
513
514 async fn allocate(&mut self) -> Result<RelayConnConfig> {
516 {
517 let read_ch_tx = self.read_ch_tx.lock().await;
518 log::debug!("allocate check: read_ch_tx_opt = {}", read_ch_tx.is_some());
519 if read_ch_tx.is_some() {
520 return Err(Error::ErrOneAllocateOnly);
521 }
522 }
523
524 let mut msg = Message::new();
525 msg.build(&[
526 Box::new(TransactionId::new()),
527 Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
528 Box::new(RequestedTransport {
529 protocol: PROTO_UDP,
530 }),
531 Box::new(FINGERPRINT),
532 ])?;
533
534 log::debug!("client.Allocate call PerformTransaction 1");
535 let tr_res = self
536 .perform_transaction(&msg, &self.turn_serv_addr.clone(), false)
537 .await?;
538 let res = tr_res.msg;
539
540 let nonce = Nonce::get_from_as(&res, ATTR_NONCE)?;
542 self.realm = Realm::get_from_as(&res, ATTR_REALM)?;
543
544 self.integrity = MessageIntegrity::new_long_term_integrity(
545 self.username.text.clone(),
546 self.realm.text.clone(),
547 self.password.clone(),
548 );
549
550 msg.build(&[
552 Box::new(TransactionId::new()),
553 Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
554 Box::new(RequestedTransport {
555 protocol: PROTO_UDP,
556 }),
557 Box::new(self.username.clone()),
558 Box::new(self.realm.clone()),
559 Box::new(nonce.clone()),
560 Box::new(self.integrity.clone()),
561 Box::new(FINGERPRINT),
562 ])?;
563
564 log::debug!("client.Allocate call PerformTransaction 2");
565 let tr_res = self
566 .perform_transaction(&msg, &self.turn_serv_addr.clone(), false)
567 .await?;
568 let res = tr_res.msg;
569
570 if res.typ.class == CLASS_ERROR_RESPONSE {
571 let mut code = ErrorCodeAttribute::default();
572 let result = code.get_from(&res);
573 if result.is_err() {
574 return Err(Error::Other(format!("{}", res.typ)));
575 } else {
576 return Err(Error::Other(format!("{} (error {})", res.typ, code)));
577 }
578 }
579
580 let mut relayed = RelayedAddress::default();
582 relayed.get_from(&res)?;
583 let relayed_addr = SocketAddr::new(relayed.ip, relayed.port);
584
585 let mut lifetime = Lifetime::default();
587 lifetime.get_from(&res)?;
588
589 let (read_ch_tx, read_ch_rx) = mpsc::channel(MAX_READ_QUEUE_SIZE);
590 {
591 let mut read_ch_tx_opt = self.read_ch_tx.lock().await;
592 *read_ch_tx_opt = Some(read_ch_tx);
593 log::debug!("allocate: read_ch_tx_opt = {}", read_ch_tx_opt.is_some());
594 }
595
596 Ok(RelayConnConfig {
597 relayed_addr,
598 integrity: self.integrity.clone(),
599 nonce,
600 lifetime: lifetime.0,
601 binding_mgr: Arc::clone(&self.binding_mgr),
602 read_ch_rx: Arc::new(Mutex::new(read_ch_rx)),
603 })
604 }
605}
606
607#[derive(Clone)]
609pub struct Client {
610 client_internal: Arc<Mutex<ClientInternal>>,
611}
612
613impl Client {
614 pub async fn new(config: ClientConfig) -> Result<Self> {
615 let ci = ClientInternal::new(config).await?;
616 Ok(Client {
617 client_internal: Arc::new(Mutex::new(ci)),
618 })
619 }
620
621 pub async fn listen(&self) -> Result<()> {
622 let ci = self.client_internal.lock().await;
623 ci.listen().await
624 }
625
626 pub async fn allocate(&self) -> Result<impl Conn> {
627 let config = {
628 let mut ci = self.client_internal.lock().await;
629 ci.allocate().await?
630 };
631
632 Ok(RelayConn::new(Arc::clone(&self.client_internal), config).await)
633 }
634
635 pub async fn close(&self) -> Result<()> {
636 let mut ci = self.client_internal.lock().await;
637 ci.close().await;
638 Ok(())
639 }
640
641 pub async fn send_binding_request_to(&self, to: &str) -> Result<SocketAddr> {
643 let mut ci = self.client_internal.lock().await;
644 ci.send_binding_request_to(to).await
645 }
646
647 pub async fn send_binding_request(&self) -> Result<SocketAddr> {
649 let mut ci = self.client_internal.lock().await;
650 ci.send_binding_request().await
651 }
652}