ombrac_client/endpoint/socks/
mod.rs1mod v5;
2
3use std::io;
4use std::net::SocketAddr;
5use std::sync::Arc;
6use std::time::Duration;
7use std::{error::Error, io::Cursor};
8
9use ombrac::prelude::*;
10use ombrac_macros::{error, info, try_or_return};
11use ombrac_transport::Initiator;
12use socks_lib::socks5::Address as Socks5Address;
13use socks_lib::ToBytes;
14use socks_lib::{socks5::UdpPacket, Streamable};
15use tokio::net::{TcpListener, TcpStream, UdpSocket};
16use tokio::time::timeout;
17
18use crate::Client;
19
20pub struct Server {}
21
22pub enum Request {
23 TcpConnect(TcpStream, Address),
24
25 #[cfg(feature = "datagram")]
26 UdpAssociate(TcpStream, UdpSocket),
27}
28
29impl Server {
30 pub async fn listen<T>(addr: SocketAddr, ombrac: Client<T>) -> Result<(), Box<dyn Error>>
31 where
32 T: Initiator + Send + Sync + 'static,
33 {
34 use ombrac::io::util::copy_bidirectional;
35
36 let ombrac = Arc::new(ombrac);
37 let listener = TcpListener::bind(addr).await?;
38
39 info!("SOCKS server listening on {}", listener.local_addr()?);
40
41 while let Ok((stream, _addr)) = listener.accept().await {
42 let ombrac = ombrac.clone();
43
44 tokio::spawn(async move {
45 let request = try_or_return!(Self::handler_v5(stream).await);
46
47 match request {
48 Request::TcpConnect(mut inbound, addr) => {
49 let outbound_result =
50 timeout(Duration::from_secs(30), ombrac.tcp_connect(addr.clone()))
51 .await;
52
53 let mut outbound = match outbound_result {
54 Ok(Ok(stream)) => stream,
55 Ok(Err(_error)) => {
56 error!("Failed to connect to {:?}: {}", addr, _error);
57 return;
58 }
59 Err(_) => {
60 error!("Connection to {:?} timed out", addr);
61 return;
62 }
63 };
64
65 let _bytes =
66 try_or_return!(copy_bidirectional(&mut inbound, &mut outbound).await);
67
68 info!(
69 "TCP Connect {:?} Send {}, Receive {}",
70 addr, _bytes.0, _bytes.1
71 );
72 }
73
74 #[cfg(feature = "datagram")]
75 Request::UdpAssociate(stream, socket) => {
76 let unr = ombrac.udp_associate().await.unwrap();
77
78 let socks_1 = Arc::new(socket);
79 let socks_2 = socks_1.clone();
80 let datagram_recv = Arc::new(unr);
81 let datagram_send = datagram_recv.clone();
82
83 let mut buf = [0u8; 2048];
84
85 let (len, client_socks_addr) = socks_2.recv_from(&mut buf).await.unwrap();
86 info!("Udp Associate from {}, {}", client_socks_addr, len);
87
88 let data = buf[..len].to_vec();
89 let socks_packet = UdpPacket::read(&mut Cursor::new(data)).await.unwrap();
90
91 let addr = match socks_packet.address {
92 Socks5Address::Domain(domain, port) => Address::Domain(domain, port),
93 Socks5Address::IPv4(addr) => Address::IPv4(addr),
94 Socks5Address::IPv6(addr) => Address::IPv6(addr),
95 };
96 let data = socks_packet.data;
97
98 datagram_send.send(addr, data).await.unwrap();
99
100 let handle = tokio::spawn(async move {
101 loop {
102 let (len, _addr) = socks_2.recv_from(&mut buf).await.unwrap();
103 let data = buf[..len].to_vec();
104 let socks_packet =
105 UdpPacket::read(&mut Cursor::new(data)).await.unwrap();
106
107 let addr = match socks_packet.address {
108 Socks5Address::Domain(domain, port) => {
109 Address::Domain(domain, port)
110 }
111 Socks5Address::IPv4(addr) => Address::IPv4(addr),
112 Socks5Address::IPv6(addr) => Address::IPv6(addr),
113 };
114 let data = socks_packet.data;
115
116 if let Err(_error) = datagram_send.send(addr, data).await {
117 error!("UDP Datagram connection close: {}", _error);
118
119 break;
120 }
121 }
122 });
123
124 let send_handle = tokio::spawn(async move {
125 while let Ok((addr, data)) = datagram_recv.recv().await {
126 info!("UDP recv from remote {:?} {:?}", addr, data.len());
127 let addr = match addr {
128 Address::Domain(domain, port) => {
129 Socks5Address::Domain(domain, port)
130 }
131 Address::IPv4(addr) => Socks5Address::IPv4(addr),
132 Address::IPv6(addr) => Socks5Address::IPv6(addr),
133 };
134 let data = UdpPacket::un_frag(addr, data.into());
135
136 if socks_1
137 .send_to(&data.to_bytes(), client_socks_addr)
138 .await
139 .is_err()
140 {
141 break;
142 }
143 }
144 });
145
146 loop {
147 tokio::time::sleep(Duration::from_secs(10)).await;
148
149 let mut buf = [0u8; 1];
150 let is_closed = match stream.peek(&mut buf).await {
151 Ok(0) => true,
152 Ok(_) => false,
153 Err(e) if e.kind() == io::ErrorKind::WouldBlock => false,
154 Err(_) => true,
155 };
156
157 if is_closed {
158 break;
159 }
160 }
161
162 handle.abort();
163 send_handle.abort();
164 }
165 };
166 });
167 }
168
169 Ok(())
170 }
171}