1use std::{cell::RefCell, io, net::SocketAddr, sync::Arc, time::Duration};
4
5use bytes::Bytes;
6use futures::future;
7use log::{debug, error, info, trace, warn};
8use lru_time_cache::LruCache;
9use rand::{Rng, SeedableRng, rngs::SmallRng};
10use shadowsocks::{
11 ServerConfig,
12 config::ServerUser,
13 crypto::CipherCategory,
14 lookup_then,
15 net::{
16 AcceptOpts, AddrFamily, UdpSocket as OutboundUdpSocket, UdpSocket as InboundUdpSocket,
17 get_ip_stack_capabilities,
18 },
19 relay::{
20 socks5::Address,
21 udprelay::{MAXIMUM_UDP_PAYLOAD_SIZE, ProxySocket, options::UdpSocketControlData},
22 },
23};
24use tokio::{runtime::Handle, sync::mpsc, task::JoinHandle, time};
25
26use crate::net::{
27 MonProxySocket, UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE, UDP_ASSOCIATION_SEND_CHANNEL_SIZE,
28 packet_window::PacketWindowFilter, utils::to_ipv4_mapped,
29};
30
31use super::context::ServiceContext;
32
33#[derive(Debug, Clone, Copy)]
34enum NatKey {
35 PeerAddr(SocketAddr),
36 #[cfg(feature = "aead-cipher-2022")]
37 SessionId(u64),
38}
39
40type AssociationMap = LruCache<SocketAddr, UdpAssociation>;
41#[cfg(feature = "aead-cipher-2022")]
42type SessionMap = LruCache<u64, UdpAssociation>;
43
44enum NatMap {
45 Association(AssociationMap),
46 #[cfg(feature = "aead-cipher-2022")]
47 Session(SessionMap),
48}
49
50impl NatMap {
51 fn cleanup_expired(&mut self) {
52 match *self {
53 Self::Association(ref mut m) => {
54 m.iter();
55 }
56 #[cfg(feature = "aead-cipher-2022")]
57 Self::Session(ref mut m) => {
58 m.iter();
59 }
60 }
61 }
62
63 fn keep_alive(&mut self, key: &NatKey) {
64 match (self, key) {
65 (Self::Association(m), NatKey::PeerAddr(peer_addr)) => {
66 m.get(peer_addr);
67 }
68 #[cfg(feature = "aead-cipher-2022")]
69 (Self::Session(m), NatKey::SessionId(session_id)) => {
70 m.get(session_id);
71 }
72 #[allow(unreachable_patterns)]
73 _ => unreachable!("NatMap & NatKey mismatch"),
74 }
75 }
76}
77
78pub struct UdpServer {
80 context: Arc<ServiceContext>,
81 assoc_map: NatMap,
82 keepalive_tx: mpsc::Sender<NatKey>,
83 keepalive_rx: mpsc::Receiver<NatKey>,
84 time_to_live: Duration,
85 listener: Arc<MonProxySocket<InboundUdpSocket>>,
86 svr_cfg: ServerConfig,
87}
88
89impl UdpServer {
90 pub(crate) async fn new(
91 context: Arc<ServiceContext>,
92 svr_cfg: ServerConfig,
93 time_to_live: Option<Duration>,
94 capacity: Option<usize>,
95 accept_opts: AcceptOpts,
96 ) -> io::Result<Self> {
97 let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION);
98
99 fn create_assoc_map<K, V>(time_to_live: Duration, capacity: Option<usize>) -> LruCache<K, V>
100 where
101 K: Ord + Clone,
102 {
103 match capacity {
104 Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity),
105 None => LruCache::with_expiry_duration(time_to_live),
106 }
107 }
108
109 let assoc_map = match svr_cfg.method().category() {
110 CipherCategory::None => NatMap::Association(create_assoc_map(time_to_live, capacity)),
111 #[cfg(feature = "aead-cipher")]
112 CipherCategory::Aead => NatMap::Association(create_assoc_map(time_to_live, capacity)),
113 #[cfg(feature = "stream-cipher")]
114 CipherCategory::Stream => NatMap::Association(create_assoc_map(time_to_live, capacity)),
115 #[cfg(feature = "aead-cipher-2022")]
116 CipherCategory::Aead2022 => NatMap::Session(create_assoc_map(time_to_live, capacity)),
117 };
118
119 let (keepalive_tx, keepalive_rx) = mpsc::channel(UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE);
120
121 let socket = ProxySocket::bind_with_opts(context.context(), &svr_cfg, accept_opts).await?;
122 let socket = MonProxySocket::from_socket(socket, context.flow_stat());
123 let listener = Arc::new(socket);
124
125 Ok(Self {
126 context,
127 assoc_map,
128 keepalive_tx,
129 keepalive_rx,
130 time_to_live,
131 listener,
132 svr_cfg,
133 })
134 }
135
136 pub fn server_config(&self) -> &ServerConfig {
138 &self.svr_cfg
139 }
140
141 pub fn local_addr(&self) -> io::Result<SocketAddr> {
143 self.listener.get_ref().local_addr()
144 }
145
146 pub async fn run(mut self) -> io::Result<()> {
148 info!(
149 "shadowsocks udp server listening on {}, inbound address {}",
150 self.local_addr().expect("listener.local_addr"),
151 self.svr_cfg.addr(),
152 );
153
154 let mut cleanup_timer = time::interval(self.time_to_live);
155
156 let mut orx_opt = None;
157
158 let cpus = Handle::current().metrics().num_workers();
159 let mut other_receivers = Vec::new();
160 if cpus > 1 {
161 let (otx, orx) = mpsc::channel((cpus - 1) * 16);
162 orx_opt = Some(orx);
163
164 other_receivers.reserve(cpus - 1);
165 trace!("udp server starting extra {} recv workers", cpus - 1);
166
167 for _ in 1..cpus {
168 let otx = otx.clone();
169 let listener = self.listener.clone();
170 let context = self.context.clone();
171
172 other_receivers.push(tokio::spawn(async move {
173 let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE];
174
175 loop {
176 let (n, peer_addr, target_addr, control) =
177 match Self::recv_one_packet(&context, &listener, &mut buffer).await {
178 Some(s) => s,
179 None => continue,
180 };
181
182 if (otx
183 .send((peer_addr, target_addr, control, Bytes::copy_from_slice(&buffer[..n])))
184 .await)
185 .is_err()
186 {
187 break;
189 }
190 }
191 }));
192 }
193 }
194
195 struct MulticoreTaskGuard<'a> {
196 tasks: &'a mut Vec<JoinHandle<()>>,
197 }
198
199 impl Drop for MulticoreTaskGuard<'_> {
200 fn drop(&mut self) {
201 for task in self.tasks.iter_mut() {
202 task.abort();
203 }
204 }
205 }
206
207 let _guard = MulticoreTaskGuard {
208 tasks: &mut other_receivers,
209 };
210
211 type QueuedDataType = (SocketAddr, Address, Option<UdpSocketControlData>, Bytes);
212
213 #[inline]
214 async fn multicore_recv(orx_opt: &mut Option<mpsc::Receiver<QueuedDataType>>) -> QueuedDataType {
215 match orx_opt {
216 None => future::pending().await,
217 Some(orx) => match orx.recv().await {
218 Some(t) => t,
219 None => unreachable!("multicore sender should keep at least 1"),
220 },
221 }
222 }
223
224 let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE];
225 let listener = self.listener.clone();
227 loop {
228 tokio::select! {
229 _ = cleanup_timer.tick() => {
230 self.assoc_map.cleanup_expired();
232 }
233
234 peer_addr_opt = self.keepalive_rx.recv() => {
235 let peer_addr = peer_addr_opt.expect("keep-alive channel closed unexpectedly");
236 self.assoc_map.keep_alive(&peer_addr);
237 }
238
239 recv_result = Self::recv_one_packet(&self.context, &listener, &mut buffer) => {
240 let (n, peer_addr, target_addr, control) = match recv_result {
241 Some(s) => s,
242 None => continue,
243 };
244
245 let data = &buffer[..n];
246 if let Err(err) = self.send_packet(&listener, peer_addr, target_addr, control, Bytes::copy_from_slice(data)).await {
247 debug!(
248 "udp packet relay {} with {} bytes failed, error: {}",
249 peer_addr,
250 data.len(),
251 err
252 );
253 }
254 }
255
256 recv_result = multicore_recv(&mut orx_opt), if orx_opt.is_some() => {
257 let (peer_addr, target_addr, control, data) = recv_result;
258 let data_len = data.len();
259 if let Err(err) = self.send_packet(&listener, peer_addr, target_addr, control, data).await {
260 debug!(
261 "udp packet relay {} with {} bytes failed, error: {}",
262 peer_addr,
263 data_len,
264 err
265 );
266 }
267 }
268 }
269 }
270 }
271
272 async fn recv_one_packet(
273 context: &ServiceContext,
274 l: &MonProxySocket<InboundUdpSocket>,
275 buffer: &mut [u8],
276 ) -> Option<(usize, SocketAddr, Address, Option<UdpSocketControlData>)> {
277 let (n, peer_addr, target_addr, control) = match l.recv_from_with_ctrl(buffer).await {
278 Ok(s) => s,
279 Err(err) => {
280 error!("udp server recv packet failed. {}", err);
281 return None;
282 }
283 };
284
285 if n == 0 {
286 return None;
294 }
295
296 if context.check_client_blocked(&peer_addr) {
297 warn!(
298 "udp client {} outbound {} access denied by ACL rules",
299 peer_addr, target_addr
300 );
301 return None;
302 }
303
304 if context.check_outbound_blocked(&target_addr).await {
305 warn!("udp client {} outbound {} blocked by ACL rules", peer_addr, target_addr);
306 return None;
307 }
308
309 Some((n, peer_addr, target_addr, control))
310 }
311
312 async fn send_packet(
313 &mut self,
314 listener: &Arc<MonProxySocket<InboundUdpSocket>>,
315 peer_addr: SocketAddr,
316 target_addr: Address,
317 control: Option<UdpSocketControlData>,
318 data: Bytes,
319 ) -> io::Result<()> {
320 match self.assoc_map {
321 NatMap::Association(ref mut m) => {
322 if let Some(assoc) = m.get(&peer_addr) {
323 return assoc.try_send((peer_addr, target_addr, data, control));
324 }
325
326 let assoc = UdpAssociation::new_association(
327 self.context.clone(),
328 listener.clone(),
329 peer_addr,
330 self.keepalive_tx.clone(),
331 );
332
333 debug!("created udp association for {}", peer_addr);
334
335 assoc.try_send((peer_addr, target_addr, data, control))?;
336 m.insert(peer_addr, assoc);
337 }
338 #[cfg(feature = "aead-cipher-2022")]
339 NatMap::Session(ref mut m) => {
340 let xcontrol = match control {
341 None => {
342 error!("control is required for session based NAT, from {}", peer_addr);
343 return Err(io::Error::other("control data missing in packet"));
344 }
345 Some(ref c) => c,
346 };
347
348 let client_session_id = xcontrol.client_session_id;
349
350 if let Some(assoc) = m.get(&client_session_id) {
351 return assoc.try_send((peer_addr, target_addr, data, control));
352 }
353
354 let assoc = UdpAssociation::new_session(
355 self.context.clone(),
356 listener.clone(),
357 peer_addr,
358 self.keepalive_tx.clone(),
359 client_session_id,
360 );
361
362 debug!(
363 "created udp association for {} with session {}",
364 peer_addr, client_session_id
365 );
366
367 assoc.try_send((peer_addr, target_addr, data, control))?;
368 m.insert(client_session_id, assoc);
369 }
370 }
371
372 Ok(())
373 }
374}
375
376type UdpAssociationSendMessage = (SocketAddr, Address, Bytes, Option<UdpSocketControlData>);
377
378struct UdpAssociation {
379 assoc_handle: JoinHandle<()>,
380 sender: mpsc::Sender<UdpAssociationSendMessage>,
381}
382
383impl Drop for UdpAssociation {
384 fn drop(&mut self) {
385 self.assoc_handle.abort();
386 }
387}
388
389impl UdpAssociation {
390 fn new_association(
391 context: Arc<ServiceContext>,
392 inbound: Arc<MonProxySocket<InboundUdpSocket>>,
393 peer_addr: SocketAddr,
394 keepalive_tx: mpsc::Sender<NatKey>,
395 ) -> Self {
396 let (assoc_handle, sender) = UdpAssociationContext::create(context, inbound, peer_addr, keepalive_tx, None);
397 Self { assoc_handle, sender }
398 }
399
400 #[cfg(feature = "aead-cipher-2022")]
401 fn new_session(
402 context: Arc<ServiceContext>,
403 inbound: Arc<MonProxySocket<InboundUdpSocket>>,
404 peer_addr: SocketAddr,
405 keepalive_tx: mpsc::Sender<NatKey>,
406 client_session_id: u64,
407 ) -> Self {
408 let (assoc_handle, sender) =
409 UdpAssociationContext::create(context, inbound, peer_addr, keepalive_tx, Some(client_session_id));
410 Self { assoc_handle, sender }
411 }
412
413 fn try_send(&self, data: UdpAssociationSendMessage) -> io::Result<()> {
414 if self.sender.try_send(data).is_err() {
415 let err = io::Error::other("udp relay channel full");
416 return Err(err);
417 }
418 Ok(())
419 }
420}
421
422struct ClientSessionContext {
423 client_session_id: u64,
424 packet_window_filter: PacketWindowFilter,
425 client_user: Option<Arc<ServerUser>>,
426}
427
428impl ClientSessionContext {
429 fn new(client_session_id: u64) -> Self {
430 Self {
431 client_session_id,
432 packet_window_filter: PacketWindowFilter::new(),
433 client_user: None,
434 }
435 }
436}
437
438struct UdpAssociationContext {
439 context: Arc<ServiceContext>,
440 peer_addr: SocketAddr,
441 outbound_ipv4_socket: Option<OutboundUdpSocket>,
442 outbound_ipv6_socket: Option<OutboundUdpSocket>,
443 keepalive_tx: mpsc::Sender<NatKey>,
444 keepalive_flag: bool,
445 inbound: Arc<MonProxySocket<InboundUdpSocket>>,
446 client_session: Option<ClientSessionContext>,
448 server_session_id: u64,
449 server_packet_id: u64,
450}
451
452impl Drop for UdpAssociationContext {
453 fn drop(&mut self) {
454 debug!("udp association for {} is closed", self.peer_addr);
455 }
456}
457
458thread_local! {
459 static CLIENT_SESSION_RNG: RefCell<SmallRng> = RefCell::new(SmallRng::from_os_rng());
460}
461
462#[inline]
463fn generate_server_session_id() -> u64 {
464 loop {
465 let id = CLIENT_SESSION_RNG.with(|rng| rng.borrow_mut().random());
466 if id != 0 {
467 break id;
468 }
469 }
470}
471
472impl UdpAssociationContext {
473 fn create(
474 context: Arc<ServiceContext>,
475 inbound: Arc<MonProxySocket<InboundUdpSocket>>,
476 peer_addr: SocketAddr,
477 keepalive_tx: mpsc::Sender<NatKey>,
478 client_session_id: Option<u64>,
479 ) -> (JoinHandle<()>, mpsc::Sender<UdpAssociationSendMessage>) {
480 let (sender, receiver) = mpsc::channel(UDP_ASSOCIATION_SEND_CHANNEL_SIZE);
484
485 let mut assoc = Self {
486 context,
487 peer_addr,
488 outbound_ipv4_socket: None,
489 outbound_ipv6_socket: None,
490 keepalive_tx,
491 keepalive_flag: false,
492 inbound,
493 client_session: client_session_id.map(ClientSessionContext::new),
494 server_session_id: generate_server_session_id(),
496 server_packet_id: 0,
497 };
498 let handle = tokio::spawn(async move { assoc.dispatch_packet(receiver).await });
499
500 (handle, sender)
501 }
502
503 async fn dispatch_packet(&mut self, mut receiver: mpsc::Receiver<UdpAssociationSendMessage>) {
504 let mut outbound_ipv4_buffer = Vec::new();
505 let mut outbound_ipv6_buffer = Vec::new();
506 let mut keepalive_interval = time::interval(Duration::from_secs(1));
507
508 loop {
509 tokio::select! {
510 packet_received_opt = receiver.recv() => {
511 let (peer_addr, target_addr, data, control) = match packet_received_opt {
512 Some(d) => d,
513 None => {
514 trace!("udp association for {} -> ... channel closed", self.peer_addr);
515 break;
516 }
517 };
518
519 self.dispatch_received_packet(peer_addr, &target_addr, &data, &control).await;
520 }
521
522 received_opt = receive_from_outbound_opt(&self.outbound_ipv4_socket, &mut outbound_ipv4_buffer), if self.outbound_ipv4_socket.is_some() => {
523 let (n, addr) = match received_opt {
524 Ok(r) => r,
525 Err(err) => {
526 error!("udp relay {} <- ... failed, error: {}", self.peer_addr, err);
527 self.outbound_ipv4_socket = None;
529 continue;
530 }
531 };
532
533 let addr = Address::from(addr);
534 self.send_received_respond_packet(addr, &outbound_ipv4_buffer[..n]).await;
535 }
536
537 received_opt = receive_from_outbound_opt(&self.outbound_ipv6_socket, &mut outbound_ipv6_buffer), if self.outbound_ipv6_socket.is_some() => {
538 let (n, addr) = match received_opt {
539 Ok(r) => r,
540 Err(err) => {
541 error!("udp relay {} <- ... failed, error: {}", self.peer_addr, err);
542 self.outbound_ipv6_socket = None;
544 continue;
545 }
546 };
547
548 let addr = Address::from(addr);
549 self.send_received_respond_packet(addr, &outbound_ipv6_buffer[..n]).await;
550 }
551
552 _ = keepalive_interval.tick() => {
553 if self.keepalive_flag {
554 let nat_key = match self.client_session {
555 None => NatKey::PeerAddr(self.peer_addr),
556 #[cfg(feature = "aead-cipher-2022")]
557 Some(ref s) => NatKey::SessionId(s.client_session_id),
558 #[cfg(not(feature = "aead-cipher-2022"))]
559 Some(..) => unreachable!("client_session_id is not None but aead-cipher-2022 is not enabled"),
560 };
561
562 if self.keepalive_tx.try_send(nat_key).is_err() {
563 debug!("udp relay {:?} keep-alive failed, channel full or closed", nat_key);
564 } else {
565 self.keepalive_flag = false;
566 }
567 }
568 }
569 }
570 }
571
572 #[inline]
573 async fn receive_from_outbound_opt(
574 socket: &Option<OutboundUdpSocket>,
575 buf: &mut Vec<u8>,
576 ) -> io::Result<(usize, SocketAddr)> {
577 match *socket {
578 None => future::pending().await,
579 Some(ref s) => {
580 if buf.is_empty() {
581 buf.resize(MAXIMUM_UDP_PAYLOAD_SIZE, 0);
582 }
583 s.recv_from(buf).await
584 }
585 }
586 }
587 }
588
589 async fn dispatch_received_packet(
590 &mut self,
591 peer_addr: SocketAddr,
592 target_addr: &Address,
593 data: &[u8],
594 control: &Option<UdpSocketControlData>,
595 ) {
596 if let Some(ref mut session) = self.client_session
597 && peer_addr != self.peer_addr {
598 debug!(
599 "udp relay for {} changed to {}, session: {:?}",
600 self.peer_addr, peer_addr, session.client_session_id
601 );
602 self.peer_addr = peer_addr;
603 }
604
605 trace!(
606 "udp relay {} -> {} with {} bytes, control: {:?}",
607 self.peer_addr,
608 target_addr,
609 data.len(),
610 control,
611 );
612
613 if self.context.check_outbound_blocked(target_addr).await {
614 error!(
615 "udp client {} outbound {} blocked by ACL rules",
616 self.peer_addr, target_addr
617 );
618 return;
619 }
620
621 if let Some(control) = control {
622 let session_context = self
625 .client_session
626 .get_or_insert_with(|| ClientSessionContext::new(control.client_session_id));
627
628 let packet_id = control.packet_id;
629 if !session_context
630 .packet_window_filter
631 .validate_packet_id(packet_id, u64::MAX)
632 {
633 error!("udp client {} packet_id {} out of window", self.peer_addr, packet_id);
634 return;
635 }
636
637 session_context.client_user.clone_from(&control.user);
638 }
639
640 if let Err(err) = self.dispatch_received_outbound_packet(target_addr, data).await {
641 error!(
642 "udp relay {} -> {} with {} bytes, error: {}",
643 self.peer_addr,
644 target_addr,
645 data.len(),
646 err
647 );
648 }
649 }
650
651 async fn dispatch_received_outbound_packet(&mut self, target_addr: &Address, data: &[u8]) -> io::Result<()> {
652 match *target_addr {
653 Address::SocketAddress(sa) => self.send_received_outbound_packet(sa, data).await,
654 Address::DomainNameAddress(ref dname, port) => {
655 lookup_then!(self.context.context_ref(), dname, port, |sa| {
656 self.send_received_outbound_packet(sa, data).await
657 })
658 .map(|_| ())
659 }
660 }
661 }
662
663 async fn send_received_outbound_packet(&mut self, original_target_addr: SocketAddr, data: &[u8]) -> io::Result<()> {
664 let ip_stack_caps = get_ip_stack_capabilities();
665
666 let target_addr = match original_target_addr {
667 SocketAddr::V4(ref v4) => {
668 if ip_stack_caps.support_ipv4_mapped_ipv6 {
672 SocketAddr::new(v4.ip().to_ipv6_mapped().into(), v4.port())
673 } else {
674 original_target_addr
675 }
676 }
677 SocketAddr::V6(ref v6) => {
678 if !ip_stack_caps.support_ipv6 || !ip_stack_caps.support_ipv4_mapped_ipv6 {
680 match v6.ip().to_ipv4_mapped() {
681 Some(v4) => SocketAddr::new(v4.into(), v6.port()),
682 None => original_target_addr,
683 }
684 } else {
685 original_target_addr
686 }
687 }
688 };
689
690 let socket = match target_addr {
691 SocketAddr::V4(..) => match self.outbound_ipv4_socket {
692 Some(ref mut socket) => socket,
693 None => {
694 let socket =
695 OutboundUdpSocket::connect_any_with_opts(AddrFamily::Ipv4, self.context.connect_opts_ref())
696 .await?;
697 self.outbound_ipv4_socket.insert(socket)
698 }
699 },
700 SocketAddr::V6(..) => match self.outbound_ipv6_socket {
701 Some(ref mut socket) => socket,
702 None => {
703 let socket =
704 OutboundUdpSocket::connect_any_with_opts(AddrFamily::Ipv6, self.context.connect_opts_ref())
705 .await?;
706 self.outbound_ipv6_socket.insert(socket)
707 }
708 },
709 };
710
711 match socket.send_to(data, target_addr).await {
712 Ok(n) => {
713 if n != data.len() {
714 warn!(
715 "{} -> {} sent {} bytes != expected {} bytes",
716 self.peer_addr,
717 target_addr,
718 n,
719 data.len()
720 );
721 }
722 Ok(())
723 }
724 Err(err) => Err(err),
725 }
726 }
727
728 async fn send_received_respond_packet(&mut self, mut addr: Address, data: &[u8]) {
729 trace!("udp relay {} <- {} received {} bytes", self.peer_addr, addr, data.len());
730
731 self.keepalive_flag = true;
733
734 if let Address::SocketAddress(SocketAddr::V6(ref v6)) = addr
740 && let Some(v4) = to_ipv4_mapped(v6.ip()) {
741 addr = Address::SocketAddress(SocketAddr::new(v4.into(), v6.port()));
742 }
743
744 match self.client_session {
745 None => {
746 match self.inbound.send_to(self.peer_addr, &addr, data).await {
748 Err(err) => {
749 warn!(
750 "udp failed to send back {} bytes to client {}, from target {}, error: {}",
751 data.len(),
752 self.peer_addr,
753 addr,
754 err
755 );
756 }
757 _ => {
758 trace!("udp relay {} <- {} with {} bytes", self.peer_addr, addr, data.len());
759 }
760 }
761 }
762 Some(ref client_session) => {
763 self.server_packet_id = match self.server_packet_id.checked_add(1) {
767 Some(i) => i,
768 None => {
769 warn!(
774 "udp failed to send back {} bytes to client {}, from target {}, server packet id overflowed",
775 data.len(),
776 self.peer_addr,
777 addr
778 );
779 return;
780 }
781 };
782
783 let mut control = UdpSocketControlData::default();
784 control.client_session_id = client_session.client_session_id;
785 control.server_session_id = self.server_session_id;
786 control.packet_id = self.server_packet_id;
787 control.user.clone_from(&client_session.client_user);
788
789 match self
790 .inbound
791 .send_to_with_ctrl(self.peer_addr, &addr, &control, data)
792 .await
793 {
794 Err(err) => {
795 warn!(
796 "udp failed to send back {} bytes to client {}, from target {}, control: {:?}, error: {}",
797 data.len(),
798 self.peer_addr,
799 addr,
800 control,
801 err
802 );
803 }
804 _ => {
805 trace!(
806 "udp relay {} <- {} with {} bytes, control {:?}",
807 self.peer_addr,
808 addr,
809 data.len(),
810 control
811 );
812 }
813 }
814 }
815 }
816 }
817}