shadowsocks_service/server/
udprelay.rs

1//! Shadowsocks UDP server
2
3use std::{cell::RefCell, io, net::SocketAddr, sync::Arc, time::Duration};
4
5use bytes::Bytes;
6use futures::future;
7use log::{debug, error, info, trace, warn};
8use lru_time_cache::LruCache;
9use rand::{Rng, SeedableRng, rngs::SmallRng};
10use shadowsocks::{
11    ServerConfig,
12    config::ServerUser,
13    crypto::CipherCategory,
14    lookup_then,
15    net::{
16        AcceptOpts, AddrFamily, UdpSocket as OutboundUdpSocket, UdpSocket as InboundUdpSocket,
17        get_ip_stack_capabilities,
18    },
19    relay::{
20        socks5::Address,
21        udprelay::{MAXIMUM_UDP_PAYLOAD_SIZE, ProxySocket, options::UdpSocketControlData},
22    },
23};
24use tokio::{runtime::Handle, sync::mpsc, task::JoinHandle, time};
25
26use crate::net::{
27    MonProxySocket, UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE, UDP_ASSOCIATION_SEND_CHANNEL_SIZE,
28    packet_window::PacketWindowFilter, utils::to_ipv4_mapped,
29};
30
31use super::context::ServiceContext;
32
33#[derive(Debug, Clone, Copy)]
34enum NatKey {
35    PeerAddr(SocketAddr),
36    #[cfg(feature = "aead-cipher-2022")]
37    SessionId(u64),
38}
39
40type AssociationMap = LruCache<SocketAddr, UdpAssociation>;
41#[cfg(feature = "aead-cipher-2022")]
42type SessionMap = LruCache<u64, UdpAssociation>;
43
44enum NatMap {
45    Association(AssociationMap),
46    #[cfg(feature = "aead-cipher-2022")]
47    Session(SessionMap),
48}
49
50impl NatMap {
51    fn cleanup_expired(&mut self) {
52        match *self {
53            Self::Association(ref mut m) => {
54                m.iter();
55            }
56            #[cfg(feature = "aead-cipher-2022")]
57            Self::Session(ref mut m) => {
58                m.iter();
59            }
60        }
61    }
62
63    fn keep_alive(&mut self, key: &NatKey) {
64        match (self, key) {
65            (Self::Association(m), NatKey::PeerAddr(peer_addr)) => {
66                m.get(peer_addr);
67            }
68            #[cfg(feature = "aead-cipher-2022")]
69            (Self::Session(m), NatKey::SessionId(session_id)) => {
70                m.get(session_id);
71            }
72            #[allow(unreachable_patterns)]
73            _ => unreachable!("NatMap & NatKey mismatch"),
74        }
75    }
76}
77
78/// UDP server instance
79pub struct UdpServer {
80    context: Arc<ServiceContext>,
81    assoc_map: NatMap,
82    keepalive_tx: mpsc::Sender<NatKey>,
83    keepalive_rx: mpsc::Receiver<NatKey>,
84    time_to_live: Duration,
85    listener: Arc<MonProxySocket<InboundUdpSocket>>,
86    svr_cfg: ServerConfig,
87}
88
89impl UdpServer {
90    pub(crate) async fn new(
91        context: Arc<ServiceContext>,
92        svr_cfg: ServerConfig,
93        time_to_live: Option<Duration>,
94        capacity: Option<usize>,
95        accept_opts: AcceptOpts,
96    ) -> io::Result<Self> {
97        let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION);
98
99        fn create_assoc_map<K, V>(time_to_live: Duration, capacity: Option<usize>) -> LruCache<K, V>
100        where
101            K: Ord + Clone,
102        {
103            match capacity {
104                Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity),
105                None => LruCache::with_expiry_duration(time_to_live),
106            }
107        }
108
109        let assoc_map = match svr_cfg.method().category() {
110            CipherCategory::None => NatMap::Association(create_assoc_map(time_to_live, capacity)),
111            #[cfg(feature = "aead-cipher")]
112            CipherCategory::Aead => NatMap::Association(create_assoc_map(time_to_live, capacity)),
113            #[cfg(feature = "stream-cipher")]
114            CipherCategory::Stream => NatMap::Association(create_assoc_map(time_to_live, capacity)),
115            #[cfg(feature = "aead-cipher-2022")]
116            CipherCategory::Aead2022 => NatMap::Session(create_assoc_map(time_to_live, capacity)),
117        };
118
119        let (keepalive_tx, keepalive_rx) = mpsc::channel(UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE);
120
121        let socket = ProxySocket::bind_with_opts(context.context(), &svr_cfg, accept_opts).await?;
122        let socket = MonProxySocket::from_socket(socket, context.flow_stat());
123        let listener = Arc::new(socket);
124
125        Ok(Self {
126            context,
127            assoc_map,
128            keepalive_tx,
129            keepalive_rx,
130            time_to_live,
131            listener,
132            svr_cfg,
133        })
134    }
135
136    /// Server's configuration
137    pub fn server_config(&self) -> &ServerConfig {
138        &self.svr_cfg
139    }
140
141    /// Server's listen address
142    pub fn local_addr(&self) -> io::Result<SocketAddr> {
143        self.listener.get_ref().local_addr()
144    }
145
146    /// Start server's accept loop
147    pub async fn run(mut self) -> io::Result<()> {
148        info!(
149            "shadowsocks udp server listening on {}, inbound address {}",
150            self.local_addr().expect("listener.local_addr"),
151            self.svr_cfg.addr(),
152        );
153
154        let mut cleanup_timer = time::interval(self.time_to_live);
155
156        let mut orx_opt = None;
157
158        let cpus = Handle::current().metrics().num_workers();
159        let mut other_receivers = Vec::new();
160        if cpus > 1 {
161            let (otx, orx) = mpsc::channel((cpus - 1) * 16);
162            orx_opt = Some(orx);
163
164            other_receivers.reserve(cpus - 1);
165            trace!("udp server starting extra {} recv workers", cpus - 1);
166
167            for _ in 1..cpus {
168                let otx = otx.clone();
169                let listener = self.listener.clone();
170                let context = self.context.clone();
171
172                other_receivers.push(tokio::spawn(async move {
173                    let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE];
174
175                    loop {
176                        let (n, peer_addr, target_addr, control) =
177                            match Self::recv_one_packet(&context, &listener, &mut buffer).await {
178                                Some(s) => s,
179                                None => continue,
180                            };
181
182                        if (otx
183                            .send((peer_addr, target_addr, control, Bytes::copy_from_slice(&buffer[..n])))
184                            .await)
185                            .is_err()
186                        {
187                            // If Result is error, the channel receiver is closed. We should exit the task.
188                            break;
189                        }
190                    }
191                }));
192            }
193        }
194
195        struct MulticoreTaskGuard<'a> {
196            tasks: &'a mut Vec<JoinHandle<()>>,
197        }
198
199        impl Drop for MulticoreTaskGuard<'_> {
200            fn drop(&mut self) {
201                for task in self.tasks.iter_mut() {
202                    task.abort();
203                }
204            }
205        }
206
207        let _guard = MulticoreTaskGuard {
208            tasks: &mut other_receivers,
209        };
210
211        type QueuedDataType = (SocketAddr, Address, Option<UdpSocketControlData>, Bytes);
212
213        #[inline]
214        async fn multicore_recv(orx_opt: &mut Option<mpsc::Receiver<QueuedDataType>>) -> QueuedDataType {
215            match orx_opt {
216                None => future::pending().await,
217                Some(orx) => match orx.recv().await {
218                    Some(t) => t,
219                    None => unreachable!("multicore sender should keep at least 1"),
220                },
221            }
222        }
223
224        let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE];
225        // Make a clone to self.listener to avoid borrowing self
226        let listener = self.listener.clone();
227        loop {
228            tokio::select! {
229                _ = cleanup_timer.tick() => {
230                    // cleanup expired associations. iter() will remove expired elements
231                    self.assoc_map.cleanup_expired();
232                }
233
234                peer_addr_opt = self.keepalive_rx.recv() => {
235                    let peer_addr = peer_addr_opt.expect("keep-alive channel closed unexpectedly");
236                    self.assoc_map.keep_alive(&peer_addr);
237                }
238
239                recv_result = Self::recv_one_packet(&self.context, &listener, &mut buffer) => {
240                    let (n, peer_addr, target_addr, control) = match recv_result {
241                        Some(s) => s,
242                        None => continue,
243                    };
244
245                    let data = &buffer[..n];
246                    if let Err(err) = self.send_packet(&listener, peer_addr, target_addr, control, Bytes::copy_from_slice(data)).await {
247                        debug!(
248                            "udp packet relay {} with {} bytes failed, error: {}",
249                            peer_addr,
250                            data.len(),
251                            err
252                        );
253                    }
254                }
255
256                recv_result = multicore_recv(&mut orx_opt), if orx_opt.is_some() => {
257                    let (peer_addr, target_addr, control, data) = recv_result;
258                    let data_len = data.len();
259                    if let Err(err) = self.send_packet(&listener, peer_addr, target_addr, control, data).await {
260                        debug!(
261                            "udp packet relay {} with {} bytes failed, error: {}",
262                            peer_addr,
263                            data_len,
264                            err
265                        );
266                    }
267                }
268            }
269        }
270    }
271
272    async fn recv_one_packet(
273        context: &ServiceContext,
274        l: &MonProxySocket<InboundUdpSocket>,
275        buffer: &mut [u8],
276    ) -> Option<(usize, SocketAddr, Address, Option<UdpSocketControlData>)> {
277        let (n, peer_addr, target_addr, control) = match l.recv_from_with_ctrl(buffer).await {
278            Ok(s) => s,
279            Err(err) => {
280                error!("udp server recv packet failed. {}", err);
281                return None;
282            }
283        };
284
285        if n == 0 {
286            // For windows, it will generate a ICMP Port Unreachable Message
287            // https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-recvfrom
288            // Which will result in recv_from return 0.
289            //
290            // It cannot be solved here, because `WSAGetLastError` is already set.
291            //
292            // See `relay::udprelay::utils::create_socket` for more detail.
293            return None;
294        }
295
296        if context.check_client_blocked(&peer_addr) {
297            warn!(
298                "udp client {} outbound {} access denied by ACL rules",
299                peer_addr, target_addr
300            );
301            return None;
302        }
303
304        if context.check_outbound_blocked(&target_addr).await {
305            warn!("udp client {} outbound {} blocked by ACL rules", peer_addr, target_addr);
306            return None;
307        }
308
309        Some((n, peer_addr, target_addr, control))
310    }
311
312    async fn send_packet(
313        &mut self,
314        listener: &Arc<MonProxySocket<InboundUdpSocket>>,
315        peer_addr: SocketAddr,
316        target_addr: Address,
317        control: Option<UdpSocketControlData>,
318        data: Bytes,
319    ) -> io::Result<()> {
320        match self.assoc_map {
321            NatMap::Association(ref mut m) => {
322                if let Some(assoc) = m.get(&peer_addr) {
323                    return assoc.try_send((peer_addr, target_addr, data, control));
324                }
325
326                let assoc = UdpAssociation::new_association(
327                    self.context.clone(),
328                    listener.clone(),
329                    peer_addr,
330                    self.keepalive_tx.clone(),
331                );
332
333                debug!("created udp association for {}", peer_addr);
334
335                assoc.try_send((peer_addr, target_addr, data, control))?;
336                m.insert(peer_addr, assoc);
337            }
338            #[cfg(feature = "aead-cipher-2022")]
339            NatMap::Session(ref mut m) => {
340                let xcontrol = match control {
341                    None => {
342                        error!("control is required for session based NAT, from {}", peer_addr);
343                        return Err(io::Error::other("control data missing in packet"));
344                    }
345                    Some(ref c) => c,
346                };
347
348                let client_session_id = xcontrol.client_session_id;
349
350                if let Some(assoc) = m.get(&client_session_id) {
351                    return assoc.try_send((peer_addr, target_addr, data, control));
352                }
353
354                let assoc = UdpAssociation::new_session(
355                    self.context.clone(),
356                    listener.clone(),
357                    peer_addr,
358                    self.keepalive_tx.clone(),
359                    client_session_id,
360                );
361
362                debug!(
363                    "created udp association for {} with session {}",
364                    peer_addr, client_session_id
365                );
366
367                assoc.try_send((peer_addr, target_addr, data, control))?;
368                m.insert(client_session_id, assoc);
369            }
370        }
371
372        Ok(())
373    }
374}
375
376type UdpAssociationSendMessage = (SocketAddr, Address, Bytes, Option<UdpSocketControlData>);
377
378struct UdpAssociation {
379    assoc_handle: JoinHandle<()>,
380    sender: mpsc::Sender<UdpAssociationSendMessage>,
381}
382
383impl Drop for UdpAssociation {
384    fn drop(&mut self) {
385        self.assoc_handle.abort();
386    }
387}
388
389impl UdpAssociation {
390    fn new_association(
391        context: Arc<ServiceContext>,
392        inbound: Arc<MonProxySocket<InboundUdpSocket>>,
393        peer_addr: SocketAddr,
394        keepalive_tx: mpsc::Sender<NatKey>,
395    ) -> Self {
396        let (assoc_handle, sender) = UdpAssociationContext::create(context, inbound, peer_addr, keepalive_tx, None);
397        Self { assoc_handle, sender }
398    }
399
400    #[cfg(feature = "aead-cipher-2022")]
401    fn new_session(
402        context: Arc<ServiceContext>,
403        inbound: Arc<MonProxySocket<InboundUdpSocket>>,
404        peer_addr: SocketAddr,
405        keepalive_tx: mpsc::Sender<NatKey>,
406        client_session_id: u64,
407    ) -> Self {
408        let (assoc_handle, sender) =
409            UdpAssociationContext::create(context, inbound, peer_addr, keepalive_tx, Some(client_session_id));
410        Self { assoc_handle, sender }
411    }
412
413    fn try_send(&self, data: UdpAssociationSendMessage) -> io::Result<()> {
414        if self.sender.try_send(data).is_err() {
415            let err = io::Error::other("udp relay channel full");
416            return Err(err);
417        }
418        Ok(())
419    }
420}
421
422struct ClientSessionContext {
423    client_session_id: u64,
424    packet_window_filter: PacketWindowFilter,
425    client_user: Option<Arc<ServerUser>>,
426}
427
428impl ClientSessionContext {
429    fn new(client_session_id: u64) -> Self {
430        Self {
431            client_session_id,
432            packet_window_filter: PacketWindowFilter::new(),
433            client_user: None,
434        }
435    }
436}
437
438struct UdpAssociationContext {
439    context: Arc<ServiceContext>,
440    peer_addr: SocketAddr,
441    outbound_ipv4_socket: Option<OutboundUdpSocket>,
442    outbound_ipv6_socket: Option<OutboundUdpSocket>,
443    keepalive_tx: mpsc::Sender<NatKey>,
444    keepalive_flag: bool,
445    inbound: Arc<MonProxySocket<InboundUdpSocket>>,
446    // AEAD 2022
447    client_session: Option<ClientSessionContext>,
448    server_session_id: u64,
449    server_packet_id: u64,
450}
451
452impl Drop for UdpAssociationContext {
453    fn drop(&mut self) {
454        debug!("udp association for {} is closed", self.peer_addr);
455    }
456}
457
458thread_local! {
459    static CLIENT_SESSION_RNG: RefCell<SmallRng> = RefCell::new(SmallRng::from_os_rng());
460}
461
462#[inline]
463fn generate_server_session_id() -> u64 {
464    loop {
465        let id = CLIENT_SESSION_RNG.with(|rng| rng.borrow_mut().random());
466        if id != 0 {
467            break id;
468        }
469    }
470}
471
472impl UdpAssociationContext {
473    fn create(
474        context: Arc<ServiceContext>,
475        inbound: Arc<MonProxySocket<InboundUdpSocket>>,
476        peer_addr: SocketAddr,
477        keepalive_tx: mpsc::Sender<NatKey>,
478        client_session_id: Option<u64>,
479    ) -> (JoinHandle<()>, mpsc::Sender<UdpAssociationSendMessage>) {
480        // Pending packets UDP_ASSOCIATION_SEND_CHANNEL_SIZE for each association should be good enough for a server.
481        // If there are plenty of packets stuck in the channel, dropping excessive packets is a good way to protect the server from
482        // being OOM.
483        let (sender, receiver) = mpsc::channel(UDP_ASSOCIATION_SEND_CHANNEL_SIZE);
484
485        let mut assoc = Self {
486            context,
487            peer_addr,
488            outbound_ipv4_socket: None,
489            outbound_ipv6_socket: None,
490            keepalive_tx,
491            keepalive_flag: false,
492            inbound,
493            client_session: client_session_id.map(ClientSessionContext::new),
494            // server_session_id must be generated randomly
495            server_session_id: generate_server_session_id(),
496            server_packet_id: 0,
497        };
498        let handle = tokio::spawn(async move { assoc.dispatch_packet(receiver).await });
499
500        (handle, sender)
501    }
502
503    async fn dispatch_packet(&mut self, mut receiver: mpsc::Receiver<UdpAssociationSendMessage>) {
504        let mut outbound_ipv4_buffer = Vec::new();
505        let mut outbound_ipv6_buffer = Vec::new();
506        let mut keepalive_interval = time::interval(Duration::from_secs(1));
507
508        loop {
509            tokio::select! {
510                packet_received_opt = receiver.recv() => {
511                    let (peer_addr, target_addr, data, control) = match packet_received_opt {
512                        Some(d) => d,
513                        None => {
514                            trace!("udp association for {} -> ... channel closed", self.peer_addr);
515                            break;
516                        }
517                    };
518
519                    self.dispatch_received_packet(peer_addr, &target_addr, &data, &control).await;
520                }
521
522                received_opt = receive_from_outbound_opt(&self.outbound_ipv4_socket, &mut outbound_ipv4_buffer), if self.outbound_ipv4_socket.is_some() => {
523                    let (n, addr) = match received_opt {
524                        Ok(r) => r,
525                        Err(err) => {
526                            error!("udp relay {} <- ... failed, error: {}", self.peer_addr, err);
527                            // Socket failure. Reset for recreation.
528                            self.outbound_ipv4_socket = None;
529                            continue;
530                        }
531                    };
532
533                    let addr = Address::from(addr);
534                    self.send_received_respond_packet(addr, &outbound_ipv4_buffer[..n]).await;
535                }
536
537                received_opt = receive_from_outbound_opt(&self.outbound_ipv6_socket, &mut outbound_ipv6_buffer), if self.outbound_ipv6_socket.is_some() => {
538                    let (n, addr) = match received_opt {
539                        Ok(r) => r,
540                        Err(err) => {
541                            error!("udp relay {} <- ... failed, error: {}", self.peer_addr, err);
542                            // Socket failure. Reset for recreation.
543                            self.outbound_ipv6_socket = None;
544                            continue;
545                        }
546                    };
547
548                    let addr = Address::from(addr);
549                    self.send_received_respond_packet(addr, &outbound_ipv6_buffer[..n]).await;
550                }
551
552                _ = keepalive_interval.tick() => {
553                    if self.keepalive_flag {
554                        let nat_key = match self.client_session {
555                            None => NatKey::PeerAddr(self.peer_addr),
556                            #[cfg(feature = "aead-cipher-2022")]
557                            Some(ref s) => NatKey::SessionId(s.client_session_id),
558                            #[cfg(not(feature = "aead-cipher-2022"))]
559                            Some(..) => unreachable!("client_session_id is not None but aead-cipher-2022 is not enabled"),
560                        };
561
562                        if self.keepalive_tx.try_send(nat_key).is_err() {
563                            debug!("udp relay {:?} keep-alive failed, channel full or closed", nat_key);
564                        } else {
565                            self.keepalive_flag = false;
566                        }
567                    }
568                }
569            }
570        }
571
572        #[inline]
573        async fn receive_from_outbound_opt(
574            socket: &Option<OutboundUdpSocket>,
575            buf: &mut Vec<u8>,
576        ) -> io::Result<(usize, SocketAddr)> {
577            match *socket {
578                None => future::pending().await,
579                Some(ref s) => {
580                    if buf.is_empty() {
581                        buf.resize(MAXIMUM_UDP_PAYLOAD_SIZE, 0);
582                    }
583                    s.recv_from(buf).await
584                }
585            }
586        }
587    }
588
589    async fn dispatch_received_packet(
590        &mut self,
591        peer_addr: SocketAddr,
592        target_addr: &Address,
593        data: &[u8],
594        control: &Option<UdpSocketControlData>,
595    ) {
596        if let Some(ref mut session) = self.client_session
597            && peer_addr != self.peer_addr {
598                debug!(
599                    "udp relay for {} changed to {}, session: {:?}",
600                    self.peer_addr, peer_addr, session.client_session_id
601                );
602                self.peer_addr = peer_addr;
603            }
604
605        trace!(
606            "udp relay {} -> {} with {} bytes, control: {:?}",
607            self.peer_addr,
608            target_addr,
609            data.len(),
610            control,
611        );
612
613        if self.context.check_outbound_blocked(target_addr).await {
614            error!(
615                "udp client {} outbound {} blocked by ACL rules",
616                self.peer_addr, target_addr
617            );
618            return;
619        }
620
621        if let Some(control) = control {
622            // Check if Packet ID is in the window
623
624            let session_context = self
625                .client_session
626                .get_or_insert_with(|| ClientSessionContext::new(control.client_session_id));
627
628            let packet_id = control.packet_id;
629            if !session_context
630                .packet_window_filter
631                .validate_packet_id(packet_id, u64::MAX)
632            {
633                error!("udp client {} packet_id {} out of window", self.peer_addr, packet_id);
634                return;
635            }
636
637            session_context.client_user.clone_from(&control.user);
638        }
639
640        if let Err(err) = self.dispatch_received_outbound_packet(target_addr, data).await {
641            error!(
642                "udp relay {} -> {} with {} bytes, error: {}",
643                self.peer_addr,
644                target_addr,
645                data.len(),
646                err
647            );
648        }
649    }
650
651    async fn dispatch_received_outbound_packet(&mut self, target_addr: &Address, data: &[u8]) -> io::Result<()> {
652        match *target_addr {
653            Address::SocketAddress(sa) => self.send_received_outbound_packet(sa, data).await,
654            Address::DomainNameAddress(ref dname, port) => {
655                lookup_then!(self.context.context_ref(), dname, port, |sa| {
656                    self.send_received_outbound_packet(sa, data).await
657                })
658                .map(|_| ())
659            }
660        }
661    }
662
663    async fn send_received_outbound_packet(&mut self, original_target_addr: SocketAddr, data: &[u8]) -> io::Result<()> {
664        let ip_stack_caps = get_ip_stack_capabilities();
665
666        let target_addr = match original_target_addr {
667            SocketAddr::V4(ref v4) => {
668                // If IPv4-mapped-IPv6 is supported.
669                // Converts IPv4 address to IPv4-mapped-IPv6
670                // All sockets will be created in IPv6 (nearly all modern OS supports IPv6 sockets)
671                if ip_stack_caps.support_ipv4_mapped_ipv6 {
672                    SocketAddr::new(v4.ip().to_ipv6_mapped().into(), v4.port())
673                } else {
674                    original_target_addr
675                }
676            }
677            SocketAddr::V6(ref v6) => {
678                // If IPv6 is not supported. Try to map it back to IPv4.
679                if !ip_stack_caps.support_ipv6 || !ip_stack_caps.support_ipv4_mapped_ipv6 {
680                    match v6.ip().to_ipv4_mapped() {
681                        Some(v4) => SocketAddr::new(v4.into(), v6.port()),
682                        None => original_target_addr,
683                    }
684                } else {
685                    original_target_addr
686                }
687            }
688        };
689
690        let socket = match target_addr {
691            SocketAddr::V4(..) => match self.outbound_ipv4_socket {
692                Some(ref mut socket) => socket,
693                None => {
694                    let socket =
695                        OutboundUdpSocket::connect_any_with_opts(AddrFamily::Ipv4, self.context.connect_opts_ref())
696                            .await?;
697                    self.outbound_ipv4_socket.insert(socket)
698                }
699            },
700            SocketAddr::V6(..) => match self.outbound_ipv6_socket {
701                Some(ref mut socket) => socket,
702                None => {
703                    let socket =
704                        OutboundUdpSocket::connect_any_with_opts(AddrFamily::Ipv6, self.context.connect_opts_ref())
705                            .await?;
706                    self.outbound_ipv6_socket.insert(socket)
707                }
708            },
709        };
710
711        match socket.send_to(data, target_addr).await {
712            Ok(n) => {
713                if n != data.len() {
714                    warn!(
715                        "{} -> {} sent {} bytes != expected {} bytes",
716                        self.peer_addr,
717                        target_addr,
718                        n,
719                        data.len()
720                    );
721                }
722                Ok(())
723            }
724            Err(err) => Err(err),
725        }
726    }
727
728    async fn send_received_respond_packet(&mut self, mut addr: Address, data: &[u8]) {
729        trace!("udp relay {} <- {} received {} bytes", self.peer_addr, addr, data.len());
730
731        // Keep association alive in map
732        self.keepalive_flag = true;
733
734        // Convert IPv4-mapped-IPv6 to IPv4
735        //
736        // It is an undefined behavior in shadowsocks' protocol about how to handle IPv4-mapped-IPv6.
737        // But for some implementations, they may expect the target address to be IPv4, because
738        // the peer address is IPv4 when calling `sendto`.
739        if let Address::SocketAddress(SocketAddr::V6(ref v6)) = addr
740            && let Some(v4) = to_ipv4_mapped(v6.ip()) {
741                addr = Address::SocketAddress(SocketAddr::new(v4.into(), v6.port()));
742            }
743
744        match self.client_session {
745            None => {
746                // Naive route, send data directly back to client without session
747                match self.inbound.send_to(self.peer_addr, &addr, data).await {
748                    Err(err) => {
749                        warn!(
750                            "udp failed to send back {} bytes to client {}, from target {}, error: {}",
751                            data.len(),
752                            self.peer_addr,
753                            addr,
754                            err
755                        );
756                    }
757                    _ => {
758                        trace!("udp relay {} <- {} with {} bytes", self.peer_addr, addr, data.len());
759                    }
760                }
761            }
762            Some(ref client_session) => {
763                // AEAD 2022, client session
764
765                // Increase Packet ID before send
766                self.server_packet_id = match self.server_packet_id.checked_add(1) {
767                    Some(i) => i,
768                    None => {
769                        // FIXME: server_packet_id overflowed. There is no way to recover from this error.
770                        //
771                        // Application clients may open a new session when it couldn't receive proper respond.
772
773                        warn!(
774                            "udp failed to send back {} bytes to client {}, from target {}, server packet id overflowed",
775                            data.len(),
776                            self.peer_addr,
777                            addr
778                        );
779                        return;
780                    }
781                };
782
783                let mut control = UdpSocketControlData::default();
784                control.client_session_id = client_session.client_session_id;
785                control.server_session_id = self.server_session_id;
786                control.packet_id = self.server_packet_id;
787                control.user.clone_from(&client_session.client_user);
788
789                match self
790                    .inbound
791                    .send_to_with_ctrl(self.peer_addr, &addr, &control, data)
792                    .await
793                {
794                    Err(err) => {
795                        warn!(
796                            "udp failed to send back {} bytes to client {}, from target {}, control: {:?}, error: {}",
797                            data.len(),
798                            self.peer_addr,
799                            addr,
800                            control,
801                            err
802                        );
803                    }
804                    _ => {
805                        trace!(
806                            "udp relay {} <- {} with {} bytes, control {:?}",
807                            self.peer_addr,
808                            addr,
809                            data.len(),
810                            control
811                        );
812                    }
813                }
814            }
815        }
816    }
817}