1mod android;
2mod api;
3mod config;
4mod dns;
5mod dump_logger;
6
7use hickory_proto::op::{Message, Query};
8use moka::future::Cache;
9use socks5_impl::{
10 Error, Result, client,
11 protocol::{Address, UserKey},
12};
13use std::{net::SocketAddr, sync::Arc, time::Duration};
14use tokio::{
15 io::{AsyncReadExt, AsyncWriteExt, BufStream},
16 net::{TcpListener, TcpStream, ToSocketAddrs, UdpSocket},
17};
18
19pub use ::tokio_util::sync::CancellationToken;
20pub use api::{dns2socks_start, dns2socks_stop};
21pub use config::{ArgProxy, ArgVerbosity, Config, ProxyType};
22pub use dump_logger::dns2socks_set_log_callback;
23
24pub const LIB_NAME: &str = "dns2socks_core";
25
26const MAX_BUFFER_SIZE: usize = 4096;
27
28pub async fn main_entry(config: Config, shutdown_token: tokio_util::sync::CancellationToken) -> Result<()> {
29 log::info!("Starting DNS2Socks listening on {}...", config.listen_addr);
30 let user_key = config.socks5_settings.credentials.clone();
31
32 let timeout = Duration::from_secs(config.timeout);
33
34 let cache = create_dns_cache();
35
36 fn handle_error(res: Result<Result<(), Error>, tokio::task::JoinError>, protocol: &str) {
37 match res {
38 Ok(Err(e)) => log::error!("{} error \"{}\"", protocol, e),
39 Err(e) => log::error!("{} error \"{}\"", protocol, e),
40 _ => {}
41 }
42 }
43
44 tokio::select! {
45 _ = shutdown_token.cancelled() => {
46 log::info!("Shutdown received");
47 },
48 res = tokio::spawn(udp_thread(config.clone(), user_key.clone(), cache.clone(), timeout)) => {
49 handle_error(res, "UDP");
50 },
51 res = tokio::spawn(tcp_thread(config, user_key, cache, timeout)) => {
52 handle_error(res, "TCP");
53 },
54 }
55
56 log::info!("DNS2Socks stopped");
57
58 Ok(())
59}
60
61pub(crate) async fn udp_thread(opt: Config, user_key: Option<UserKey>, cache: Cache<Vec<Query>, Message>, timeout: Duration) -> Result<()> {
62 let listener = match UdpSocket::bind(&opt.listen_addr).await {
63 Ok(listener) => listener,
64 Err(e) => {
65 log::error!("UDP listener {} error \"{}\"", opt.listen_addr, e);
66 return Err(e.into());
67 }
68 };
69 let listener = Arc::new(listener);
70 log::info!("Udp listening on: {}", opt.listen_addr);
71
72 loop {
73 let listener = listener.clone();
74 let opt = opt.clone();
75 let cache = cache.clone();
76 let auth = user_key.clone();
77 let block = async move {
78 let mut buf = vec![0u8; MAX_BUFFER_SIZE];
79 let (len, src) = listener.recv_from(&mut buf).await?;
80 buf.resize(len, 0);
81 tokio::spawn(async move {
82 if let Err(e) = udp_incoming_handler(listener, buf, src, opt, cache, auth, timeout).await {
83 log::error!("DNS query via UDP incoming handler error \"{}\"", e);
84 }
85 });
86 Ok::<(), Error>(())
87 };
88 if let Err(e) = block.await {
89 log::error!("UDP listener error \"{}\"", e);
90 }
91 }
92}
93
94async fn udp_incoming_handler(
95 listener: Arc<UdpSocket>,
96 mut buf: Vec<u8>,
97 src: SocketAddr,
98 opt: Config,
99 cache: Cache<Vec<Query>, Message>,
100 auth: Option<UserKey>,
101 timeout: Duration,
102) -> Result<()> {
103 let message = dns::parse_data_to_dns_message(&buf, false)?;
104 let domain = dns::extract_domain_from_dns_message(&message)?;
105
106 if opt.cache_records
107 && let Some(cached_message) = dns_cache_get_message(&cache, &message).await
108 {
109 let data = cached_message.to_vec().map_err(|e| e.to_string())?;
110 listener.send_to(&data, &src).await?;
111 log_dns_message("DNS query via UDP cache hit", &domain, &cached_message);
112 return Ok(());
113 }
114
115 let proxy_addr = opt.socks5_settings.addr;
116 let dest_addr = opt.dns_remote_server;
117
118 let data = if opt.force_tcp {
119 let mut new_buf = (buf.len() as u16).to_be_bytes().to_vec();
120 new_buf.append(&mut buf);
121 tcp_via_socks5_server(proxy_addr, dest_addr, auth, &new_buf, timeout)
122 .await
123 .map_err(|e| format!("querying \"{domain}\" {e}"))?
124 } else {
125 client::UdpClientImpl::datagram(proxy_addr, dest_addr, auth)
126 .await
127 .map_err(|e| format!("preparing to query \"{domain}\" {e}"))?
128 .transfer_data(&buf, timeout)
129 .await
130 .map_err(|e| format!("querying \"{domain}\" {e}"))?
131 };
132 let message = dns::parse_data_to_dns_message(&data, opt.force_tcp)?;
133 let msg_buf = message.to_vec().map_err(|e| e.to_string())?;
134
135 listener.send_to(&msg_buf, &src).await?;
136
137 let prefix = format!("DNS query via {}", if opt.force_tcp { "TCP" } else { "UDP" });
138 log_dns_message(&prefix, &domain, &message);
139 if opt.cache_records {
140 dns_cache_put_message(&cache, &message).await;
141 }
142 Ok::<(), Error>(())
143}
144
145pub(crate) async fn tcp_thread(opt: Config, user_key: Option<UserKey>, cache: Cache<Vec<Query>, Message>, timeout: Duration) -> Result<()> {
146 let listener = match TcpListener::bind(&opt.listen_addr).await {
147 Ok(listener) => listener,
148 Err(e) => {
149 log::error!("TCP listener {} error \"{}\"", opt.listen_addr, e);
150 return Err(e.into());
151 }
152 };
153 log::info!("TCP listening on: {}", opt.listen_addr);
154
155 while let Ok((mut incoming, _)) = listener.accept().await {
156 let opt = opt.clone();
157 let user_key = user_key.clone();
158 let cache = cache.clone();
159 tokio::spawn(async move {
160 if let Err(e) = handle_tcp_incoming(&opt, user_key, cache, &mut incoming, timeout).await {
161 log::error!("TCP error \"{}\"", e);
162 }
163 });
164 }
165 Ok(())
166}
167
168async fn handle_tcp_incoming(
169 opt: &Config,
170 auth: Option<UserKey>,
171 cache: Cache<Vec<Query>, Message>,
172 incoming: &mut TcpStream,
173 timeout: Duration,
174) -> Result<()> {
175 let mut buf = [0u8; MAX_BUFFER_SIZE];
176 let n = tokio::time::timeout(timeout, incoming.read(&mut buf)).await??;
177
178 let message = dns::parse_data_to_dns_message(&buf[..n], true)?;
179 let domain = dns::extract_domain_from_dns_message(&message)?;
180
181 if opt.cache_records
182 && let Some(cached_message) = dns_cache_get_message(&cache, &message).await
183 {
184 let data = cached_message.to_vec().map_err(|e| e.to_string())?;
185 let len = u16::try_from(data.len()).map_err(|e| e.to_string())?.to_be_bytes().to_vec();
186 let data = [len, data].concat();
187 incoming.write_all(&data).await?;
188 log_dns_message("DNS query via TCP cache hit", &domain, &cached_message);
189 return Ok(());
190 }
191
192 let proxy_addr = opt.socks5_settings.addr;
193 let target_server = opt.dns_remote_server;
194 let response_buf = tcp_via_socks5_server(proxy_addr, target_server, auth, &buf[..n], timeout).await?;
195
196 incoming.write_all(&response_buf).await?;
197
198 let message = dns::parse_data_to_dns_message(&response_buf, true)?;
199 log_dns_message("DNS query via TCP", &domain, &message);
200
201 if opt.cache_records {
202 dns_cache_put_message(&cache, &message).await;
203 }
204
205 Ok(())
206}
207
208async fn tcp_via_socks5_server<A, B>(
209 proxy_addr: A,
210 target_server: B,
211 auth: Option<UserKey>,
212 buf: &[u8],
213 timeout: Duration,
214) -> Result<Vec<u8>>
215where
216 A: ToSocketAddrs,
217 B: Into<Address>,
218{
219 let s5_proxy = TcpStream::connect(proxy_addr).await?;
220 let mut stream = BufStream::new(s5_proxy);
221 let _addr = client::connect(&mut stream, target_server, auth).await?;
222
223 stream.write_all(buf).await?;
224 stream.flush().await?;
225
226 let mut buf = vec![0; MAX_BUFFER_SIZE];
227 let n = tokio::time::timeout(timeout, stream.read(&mut buf)).await??;
228 Ok(buf[..n].to_vec())
229}
230
231fn log_dns_message(prefix: &str, domain: &str, message: &Message) {
232 let ipaddr = match dns::extract_ipaddr_from_dns_message(message) {
233 Ok(ipaddr) => {
234 format!("{:?}", ipaddr)
235 }
236 Err(e) => e.to_string(),
237 };
238 log::trace!("{} {:?} <==> {:?}", prefix, domain, ipaddr);
239}
240
241pub(crate) fn create_dns_cache() -> Cache<Vec<Query>, Message> {
242 Cache::builder()
243 .time_to_live(Duration::from_secs(30 * 60))
244 .time_to_idle(Duration::from_secs(5 * 60))
245 .build()
246}
247
248pub(crate) async fn dns_cache_get_message(cache: &Cache<Vec<Query>, Message>, message: &Message) -> Option<Message> {
249 if let Some(mut cached_message) = cache.get(&message.queries().to_vec()).await {
250 cached_message.set_id(message.id());
251 return Some(cached_message);
252 }
253 None
254}
255
256pub(crate) async fn dns_cache_put_message(cache: &Cache<Vec<Query>, Message>, message: &Message) {
257 cache.insert(message.queries().to_vec(), message.clone()).await;
258}