use super::proxy::{socks5, socks5_config::random_auth};
use multiaddr::{MultiAddr, Protocol};
pub use tokio::{
net::{TcpListener, TcpStream},
spawn,
task::{JoinHandle, block_in_place, spawn_blocking, yield_now},
};
use crate::{
service::config::{TcpSocket, TcpSocketConfig, TcpSocketTransformer, TransformerContext},
utils::redact_auth_from_url,
};
use socket2::{Domain, Protocol as SocketProtocol, Socket, Type};
#[cfg(unix)]
use std::os::unix::io::{FromRawFd, IntoRawFd};
#[cfg(windows)]
use std::os::windows::io::{FromRawSocket, IntoRawSocket};
use std::{io, net::SocketAddr};
use tokio::net::TcpSocket as TokioTcp;
#[cfg(feature = "tokio-timer")]
pub use {
time::{Interval, interval},
tokio::time::{MissedTickBehavior, Sleep as Delay, Timeout, sleep as delay_for, timeout},
};
#[cfg(feature = "tokio-timer")]
mod time {
use futures::Stream;
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::time::{
Instant, Interval as Inner, MissedTickBehavior, interval_at as inner_interval,
};
pub struct Interval(Inner);
impl Interval {
pub fn new(period: Duration) -> Self {
Self::new_at(Duration::ZERO, period)
}
pub fn new_at(start_since_now: Duration, period: Duration) -> Self {
Self(inner_interval(Instant::now() + start_since_now, period))
}
pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) {
self.0.set_missed_tick_behavior(behavior);
}
}
impl Stream for Interval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
match self.0.poll_tick(cx) {
Poll::Ready(_) => Poll::Ready(Some(())),
Poll::Pending => Poll::Pending,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(usize::MAX, None)
}
}
pub fn interval(period: Duration) -> Interval {
Interval::new(period)
}
}
pub(crate) fn listen(addr: SocketAddr, tcp_config: TcpSocketConfig) -> io::Result<TcpListener> {
let domain = Domain::for_address(addr);
let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?;
let socket = {
#[cfg(not(windows))]
socket.set_reuse_address(true)?;
let transformer_context = TransformerContext::new_listen(addr);
let t = (tcp_config.socket_transformer)(TcpSocket { inner: socket }, transformer_context)?;
t.inner.set_nonblocking(true)?;
unsafe {
#[cfg(unix)]
let socket = TokioTcp::from_raw_fd(t.into_raw_fd());
#[cfg(windows)]
let socket = TokioTcp::from_raw_socket(t.into_raw_socket());
socket
}
};
if let Err(e) = socket.bind(addr) {
if Some(22) != e.raw_os_error() {
return Err(e);
}
}
socket.listen(1024)
}
async fn connect_direct(
addr: SocketAddr,
socket_transformer: TcpSocketTransformer,
) -> io::Result<TcpStream> {
let domain = Domain::for_address(addr);
let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?;
let socket = {
let transformer_context = TransformerContext::new_dial(addr);
let t = socket_transformer(TcpSocket { inner: socket }, transformer_context)?;
t.inner.set_nonblocking(true)?;
unsafe {
#[cfg(unix)]
let socket = TokioTcp::from_raw_fd(t.into_raw_fd());
#[cfg(windows)]
let socket = TokioTcp::from_raw_socket(t.into_raw_socket());
socket
}
};
socket.connect(addr).await
}
async fn connect_by_proxy(
target_addr: String,
target_port: u16,
mut proxy_server_url: url::Url,
proxy_random_auth: bool,
) -> io::Result<TcpStream> {
if proxy_random_auth {
if proxy_server_url.username().is_empty() {
let (random_username, random_passwd) = random_auth();
proxy_server_url
.set_username(&random_username)
.map_err(|_| io::Error::other("failed to set username"))?;
proxy_server_url
.set_password(Some(&random_passwd))
.map_err(|_| io::Error::other("failed to set password"))?;
} else {
}
}
socks5::connect(proxy_server_url.clone(), target_addr.clone(), target_port)
.await
.map_err(|err| {
io::Error::other(
format!(
"socks5_connect to target_addr: {}, target_port: {} by proxy_server: {} failed, err: {}",
target_addr, target_port, redact_auth_from_url(&proxy_server_url), err
),
)
})
}
pub(crate) async fn connect(
target_addr: SocketAddr,
tcp_config: TcpSocketConfig,
) -> io::Result<TcpStream> {
let TcpSocketConfig {
socket_transformer,
proxy_url,
onion_url: _,
proxy_random_auth,
} = tcp_config;
match proxy_url {
Some(proxy_url) => connect_by_proxy(
target_addr.ip().to_string(),
target_addr.port(),
proxy_url.clone(),
proxy_random_auth,
)
.await
.map_err(|err| {
io::Error::other(format!("connect_by_proxy: {}, error: {}", proxy_url, err))
}),
None => connect_direct(target_addr, socket_transformer).await,
}
}
pub(crate) async fn connect_onion(
onion_addr: MultiAddr,
tcp_config: TcpSocketConfig,
) -> io::Result<TcpStream> {
let TcpSocketConfig {
socket_transformer: _,
proxy_url,
onion_url,
proxy_random_auth,
} = tcp_config;
let tor_server_url = onion_url.or(proxy_url).ok_or(io::Error::other(
"need tor proxy server to connect to onion address",
))?;
let onion_protocol = onion_addr
.iter()
.find_map(|protocol| {
if let Protocol::Onion3(onion_address) = protocol {
Some(onion_address)
} else {
None
}
})
.ok_or(io::Error::other(format!(
"No Onion3 address found. in {}",
onion_addr
)))?;
let onion_str = onion_protocol.hash_string() + ".onion";
let onion_port = onion_protocol.port();
connect_by_proxy(onion_str, onion_port, tor_server_url, proxy_random_auth).await
}