ombrac_client/endpoint/socks/
mod.rs

1mod v5;
2
3use std::io;
4use std::net::SocketAddr;
5use std::sync::Arc;
6use std::time::Duration;
7use std::{error::Error, io::Cursor};
8
9use ombrac::prelude::*;
10use ombrac_macros::{error, info, try_or_return};
11use ombrac_transport::Transport;
12use socks_lib::socks5::Address as Socks5Address;
13use socks_lib::ToBytes;
14use socks_lib::{socks5::UdpPacket, Streamable};
15use tokio::net::{TcpListener, TcpStream, UdpSocket};
16use tokio::time::timeout;
17
18use crate::Client;
19
20pub struct Server {}
21
22pub enum Request {
23    TcpConnect(TcpStream, Address),
24
25    #[cfg(feature = "datagram")]
26    UdpAssociate(TcpStream, UdpSocket),
27}
28
29impl Server {
30    pub async fn listen<T>(addr: SocketAddr, ombrac: Client<T>) -> Result<(), Box<dyn Error>>
31    where
32        T: Transport + Send + Sync + 'static,
33    {
34        use ombrac::io::util::copy_bidirectional;
35
36        let ombrac = Arc::new(ombrac);
37        let listener = TcpListener::bind(addr).await?;
38
39        info!("SOCKS server listening on {}", listener.local_addr()?);
40
41        while let Ok((stream, _addr)) = listener.accept().await {
42            let ombrac = ombrac.clone();
43
44            tokio::spawn(async move {
45                let request = try_or_return!(Self::handler_v5(stream).await);
46
47                match request {
48                    Request::TcpConnect(mut inbound, addr) => {
49                        let outbound_result =
50                            timeout(Duration::from_secs(30), ombrac.tcp_connect(addr.clone()))
51                                .await;
52
53                        let mut outbound = match outbound_result {
54                            Ok(Ok(stream)) => stream,
55                            Ok(Err(_error)) => {
56                                error!("Failed to connect to {:?}: {}", addr, _error);
57                                return;
58                            }
59                            Err(_) => {
60                                error!("Connection to {:?} timed out", addr);
61                                return;
62                            }
63                        };
64
65                        let _bytes =
66                            try_or_return!(copy_bidirectional(&mut inbound, &mut outbound).await);
67
68                        info!(
69                            "TCP Connect {:?} Send {}, Receive {}",
70                            addr, _bytes.0, _bytes.1
71                        );
72                    }
73
74                    #[cfg(feature = "datagram")]
75                    Request::UdpAssociate(stream, socket) => {
76                        let unr = ombrac.udp_associate().await.unwrap();
77
78                        let socks_1 = Arc::new(socket);
79                        let socks_2 = socks_1.clone();
80                        let datagram_recv = Arc::new(unr);
81                        let datagram_send = datagram_recv.clone();
82
83                        let mut buf = [0u8; 2048];
84
85                        let (len, client_socks_addr) = socks_2.recv_from(&mut buf).await.unwrap();
86                        info!("Udp Associate from {}, {}", client_socks_addr, len);
87
88                        let data = buf[..len].to_vec();
89                        let socks_packet = UdpPacket::read(&mut Cursor::new(data)).await.unwrap();
90
91                        let addr = match socks_packet.address {
92                            Socks5Address::Domain(domain, port) => Address::Domain(domain, port),
93                            Socks5Address::IPv4(addr) => Address::IPv4(addr),
94                            Socks5Address::IPv6(addr) => Address::IPv6(addr),
95                        };
96                        let data = socks_packet.data;
97
98                        datagram_send.send(addr, data).await.unwrap();
99
100                        let handle = tokio::spawn(async move {
101                            loop {
102                                let (len, _addr) = socks_2.recv_from(&mut buf).await.unwrap();
103                                let data = buf[..len].to_vec();
104                                let socks_packet =
105                                    UdpPacket::read(&mut Cursor::new(data)).await.unwrap();
106
107                                let addr = match socks_packet.address {
108                                    Socks5Address::Domain(domain, port) => {
109                                        Address::Domain(domain, port)
110                                    }
111                                    Socks5Address::IPv4(addr) => Address::IPv4(addr),
112                                    Socks5Address::IPv6(addr) => Address::IPv6(addr),
113                                };
114                                let data = socks_packet.data;
115
116                                if let Err(_error) = datagram_send.send(addr, data).await {
117                                    error!("UDP Datagram connection close: {}", _error);
118
119                                    break;
120                                }
121                            }
122                        });
123
124                        let send_handle = tokio::spawn(async move {
125                            while let Ok((addr, data)) = datagram_recv.recv().await {
126                                info!("UDP recv from remote {:?} {:?}", addr, data.len());
127                                let addr = match addr {
128                                    Address::Domain(domain, port) => {
129                                        Socks5Address::Domain(domain, port)
130                                    }
131                                    Address::IPv4(addr) => Socks5Address::IPv4(addr),
132                                    Address::IPv6(addr) => Socks5Address::IPv6(addr),
133                                };
134                                let data = UdpPacket::un_frag(addr, data.into());
135
136                                if socks_1
137                                    .send_to(&data.to_bytes(), client_socks_addr)
138                                    .await
139                                    .is_err()
140                                {
141                                    break;
142                                }
143                            }
144                        });
145
146                        loop {
147                            tokio::time::sleep(Duration::from_secs(10)).await;
148
149                            let mut buf = [0u8; 1];
150                            let is_closed = match stream.peek(&mut buf).await {
151                                Ok(0) => true,
152                                Ok(_) => false,
153                                Err(e) if e.kind() == io::ErrorKind::WouldBlock => false,
154                                Err(_) => true,
155                            };
156
157                            if is_closed {
158                                break;
159                            }
160                        }
161
162                        handle.abort();
163                        send_handle.abort();
164                    }
165                };
166            });
167        }
168
169        Ok(())
170    }
171}