Skip to main content

dns2socks_core/
lib.rs

1mod android;
2mod api;
3mod config;
4mod dns;
5mod dump_logger;
6
7use hickory_proto::op::{Message, Query};
8use moka::future::Cache;
9use socks5_impl::{
10    Error, Result, client,
11    protocol::{Address, UserKey},
12};
13use std::{net::SocketAddr, sync::Arc, time::Duration};
14use tokio::{
15    io::{AsyncReadExt, AsyncWriteExt, BufStream},
16    net::{TcpListener, TcpStream, ToSocketAddrs, UdpSocket},
17};
18
19pub use ::tokio_util::sync::CancellationToken;
20pub use api::{dns2socks_start, dns2socks_stop};
21pub use config::{ArgProxy, ArgVerbosity, Config, ProxyType};
22pub use dump_logger::dns2socks_set_log_callback;
23
24pub const LIB_NAME: &str = "dns2socks_core";
25
26const MAX_BUFFER_SIZE: usize = 4096;
27
28pub async fn main_entry(config: Config, shutdown_token: tokio_util::sync::CancellationToken) -> Result<()> {
29    log::info!("Starting DNS2Socks listening on {}...", config.listen_addr);
30    let user_key = config.socks5_settings.credentials.clone();
31
32    let timeout = Duration::from_secs(config.timeout);
33
34    let cache = create_dns_cache();
35    let shutdown_for_select = shutdown_token.clone();
36    tokio::select! {
37        _ = shutdown_for_select.cancelled() => {
38            log::info!("Shutdown received");
39        },
40        res = udp_thread(config.clone(), user_key.clone(), cache.clone(), timeout, shutdown_token.clone()) => {
41            res?;
42        },
43        res = tcp_thread(config, user_key, cache, timeout, shutdown_token) => {
44            res?;
45        },
46    }
47
48    log::info!("DNS2Socks stopped");
49
50    Ok(())
51}
52
53pub(crate) async fn udp_thread(
54    opt: Config,
55    user_key: Option<UserKey>,
56    cache: Cache<Vec<Query>, Message>,
57    timeout: Duration,
58    shutdown_token: tokio_util::sync::CancellationToken,
59) -> Result<()> {
60    let listener = match UdpSocket::bind(&opt.listen_addr).await {
61        Ok(listener) => listener,
62        Err(e) => {
63            log::error!("UDP listener {} error \"{}\"", opt.listen_addr, e);
64            return Err(e.into());
65        }
66    };
67    let listener = Arc::new(listener);
68    log::info!("Udp listening on: {}", opt.listen_addr);
69
70    loop {
71        let listener = listener.clone();
72        let opt = opt.clone();
73        let cache = cache.clone();
74        let auth = user_key.clone();
75        tokio::select! {
76            _ = shutdown_token.cancelled() => {
77                log::info!("UDP shutdown received");
78                return Ok(());
79            }
80            res = async move {
81                let mut buf = vec![0u8; MAX_BUFFER_SIZE];
82                let (len, src) = listener.recv_from(&mut buf).await?;
83                buf.resize(len, 0);
84                tokio::spawn(async move {
85                    if let Err(e) = udp_incoming_handler(listener, buf, src, opt, cache, auth, timeout).await {
86                        log::error!("DNS query via UDP incoming handler error \"{}\"", e);
87                    }
88                });
89                Ok::<(), Error>(())
90            } => {
91                if let Err(e) = res {
92                    log::error!("UDP listener error \"{}\"", e);
93                    return Err(e);
94                }
95            }
96        }
97    }
98}
99
100async fn udp_incoming_handler(
101    listener: Arc<UdpSocket>,
102    mut buf: Vec<u8>,
103    src: SocketAddr,
104    opt: Config,
105    cache: Cache<Vec<Query>, Message>,
106    auth: Option<UserKey>,
107    timeout: Duration,
108) -> Result<()> {
109    let message = dns::parse_data_to_dns_message(&buf, false)?;
110    let domain = dns::extract_domain_from_dns_message(&message)?;
111
112    if opt.cache_records
113        && let Some(cached_message) = dns_cache_get_message(&cache, &message).await
114    {
115        let data = cached_message.to_vec().map_err(|e| e.to_string())?;
116        listener.send_to(&data, &src).await?;
117        log_dns_message("DNS query via UDP cache hit", &domain, &cached_message);
118        return Ok(());
119    }
120
121    let proxy_addr = opt.socks5_settings.addr;
122    let dest_addr = opt.dns_remote_server;
123
124    let data = if opt.force_tcp {
125        let mut new_buf = (buf.len() as u16).to_be_bytes().to_vec();
126        new_buf.append(&mut buf);
127        tcp_via_socks5_server(proxy_addr, dest_addr, auth, &new_buf, timeout)
128            .await
129            .map_err(|e| format!("querying \"{domain}\" {e}"))?
130    } else {
131        client::UdpClientImpl::datagram(proxy_addr, dest_addr, auth)
132            .await
133            .map_err(|e| format!("preparing to query \"{domain}\" {e}"))?
134            .transfer_data(&buf, timeout)
135            .await
136            .map_err(|e| format!("querying \"{domain}\" {e}"))?
137    };
138    let message = dns::parse_data_to_dns_message(&data, opt.force_tcp)?;
139    let msg_buf = message.to_vec().map_err(|e| e.to_string())?;
140
141    listener.send_to(&msg_buf, &src).await?;
142
143    let prefix = format!("DNS query via {}", if opt.force_tcp { "TCP" } else { "UDP" });
144    log_dns_message(&prefix, &domain, &message);
145    if opt.cache_records {
146        dns_cache_put_message(&cache, &message).await;
147    }
148    Ok::<(), Error>(())
149}
150
151pub(crate) async fn tcp_thread(
152    opt: Config,
153    user_key: Option<UserKey>,
154    cache: Cache<Vec<Query>, Message>,
155    timeout: Duration,
156    shutdown_token: tokio_util::sync::CancellationToken,
157) -> Result<()> {
158    let listener = match TcpListener::bind(&opt.listen_addr).await {
159        Ok(listener) => listener,
160        Err(e) => {
161            log::error!("TCP listener {} error \"{}\"", opt.listen_addr, e);
162            return Err(e.into());
163        }
164    };
165    log::info!("TCP listening on: {}", opt.listen_addr);
166
167    loop {
168        tokio::select! {
169            _ = shutdown_token.cancelled() => {
170                log::info!("TCP shutdown received");
171                return Ok(());
172            }
173            res = listener.accept() => {
174                let (mut incoming, _) = match res {
175                    Ok(conn) => conn,
176                    Err(e) => {
177                        log::error!("TCP listener {} error \"{}\"", opt.listen_addr, e);
178                        return Err(e.into());
179                    }
180                };
181                let opt = opt.clone();
182                let user_key = user_key.clone();
183                let cache = cache.clone();
184                tokio::spawn(async move {
185                    if let Err(e) = handle_tcp_incoming(&opt, user_key, cache, &mut incoming, timeout).await {
186                        log::error!("TCP error \"{}\"", e);
187                    }
188                });
189            }
190        };
191    }
192}
193
194async fn handle_tcp_incoming(
195    opt: &Config,
196    auth: Option<UserKey>,
197    cache: Cache<Vec<Query>, Message>,
198    incoming: &mut TcpStream,
199    timeout: Duration,
200) -> Result<()> {
201    let mut len_buf = [0u8; 2];
202    tokio::time::timeout(timeout, incoming.read_exact(&mut len_buf)).await??;
203    let len = u16::from_be_bytes(len_buf) as usize;
204    let mut msg_buf = vec![0u8; len];
205    tokio::time::timeout(timeout, incoming.read_exact(&mut msg_buf)).await??;
206
207    let mut buf = len_buf.to_vec();
208    buf.extend(msg_buf);
209
210    let message = dns::parse_data_to_dns_message(&buf, true)?;
211    let domain = dns::extract_domain_from_dns_message(&message)?;
212
213    if opt.cache_records
214        && let Some(cached_message) = dns_cache_get_message(&cache, &message).await
215    {
216        let data = cached_message.to_vec().map_err(|e| e.to_string())?;
217        let len = u16::try_from(data.len()).map_err(|e| e.to_string())?.to_be_bytes().to_vec();
218        let data = [len, data].concat();
219        incoming.write_all(&data).await?;
220        log_dns_message("DNS query via TCP cache hit", &domain, &cached_message);
221        return Ok(());
222    }
223
224    let proxy_addr = opt.socks5_settings.addr;
225    let target_server = opt.dns_remote_server;
226    let response_buf = tcp_via_socks5_server(proxy_addr, target_server, auth, &buf, timeout).await?;
227
228    incoming.write_all(&response_buf).await?;
229
230    let message = dns::parse_data_to_dns_message(&response_buf, true)?;
231    log_dns_message("DNS query via TCP", &domain, &message);
232
233    if opt.cache_records {
234        dns_cache_put_message(&cache, &message).await;
235    }
236
237    Ok(())
238}
239
240async fn tcp_via_socks5_server<A, B>(
241    proxy_addr: A,
242    target_server: B,
243    auth: Option<UserKey>,
244    buf: &[u8],
245    timeout: Duration,
246) -> Result<Vec<u8>>
247where
248    A: ToSocketAddrs,
249    B: Into<Address>,
250{
251    let s5_proxy = tokio::time::timeout(timeout, TcpStream::connect(proxy_addr)).await??;
252    let mut stream = BufStream::new(s5_proxy);
253    let _addr = tokio::time::timeout(timeout, client::connect(&mut stream, target_server, auth)).await??;
254
255    stream.write_all(buf).await?;
256    stream.flush().await?;
257
258    // Read the length prefix (2 bytes)
259    let mut len_buf = [0u8; 2];
260    tokio::time::timeout(timeout, stream.read_exact(&mut len_buf)).await??;
261    let len = u16::from_be_bytes(len_buf) as usize;
262
263    // Read the DNS message
264    let mut msg_buf = vec![0u8; len];
265    tokio::time::timeout(timeout, stream.read_exact(&mut msg_buf)).await??;
266
267    // Prepend the length prefix to match the expected format
268    let mut response_buf = len_buf.to_vec();
269    response_buf.extend(msg_buf);
270    Ok(response_buf)
271}
272
273fn log_dns_message(prefix: &str, domain: &str, message: &Message) {
274    let ipaddr = match dns::extract_ipaddr_from_dns_message(message) {
275        Ok(ipaddr) => {
276            format!("{:?}", ipaddr)
277        }
278        Err(e) => e.to_string(),
279    };
280    log::trace!("{} {:?} <==> {:?}", prefix, domain, ipaddr);
281}
282
283pub(crate) fn create_dns_cache() -> Cache<Vec<Query>, Message> {
284    Cache::builder()
285        .time_to_live(Duration::from_secs(30 * 60))
286        .time_to_idle(Duration::from_secs(5 * 60))
287        .build()
288}
289
290pub(crate) async fn dns_cache_get_message(cache: &Cache<Vec<Query>, Message>, message: &Message) -> Option<Message> {
291    if let Some(mut cached_message) = cache.get(&message.queries).await {
292        cached_message.metadata.id = message.metadata.id;
293        return Some(cached_message);
294    }
295    None
296}
297
298pub(crate) async fn dns_cache_put_message(cache: &Cache<Vec<Query>, Message>, message: &Message) {
299    cache.insert(message.queries.clone(), message.clone()).await;
300}