use crate::error::{HlsError, HlsResult};
use crate::*;
use std::fmt::{Display, Formatter};
use std::io;
use std::io::{ErrorKind, Read};
use std::net::{IpAddr, Shutdown};
use std::net::SocketAddr;
#[cfg(feature = "aync")]
use std::pin::Pin;
use std::str::FromStr;
#[cfg(feature = "aync")]
use std::task::{Context, Poll};
#[cfg(feature = "aync")]
use tokio::io::ReadBuf;
#[derive(Clone, Debug)]
pub enum Proxy {
Null,
HttpPlain(Url),
Socks5(Url),
}
impl Proxy {
pub fn new_http_plain(host: impl ToString, port: u16) -> Proxy {
let mut url = Url::default();
url.set_addr(Addr::new_addr(host, port));
url.set_scheme(Scheme::Http);
Proxy::HttpPlain(url)
}
pub fn new_socks5(host: impl ToString, port: u16) -> Proxy {
let mut url = Url::default();
url.set_addr(Addr::new_addr(host, port));
url.set_scheme(Scheme::Socks5);
Proxy::Socks5(url)
}
fn write_context<W: WriteExt>(&self, peer_addr: &Addr, writer: &mut W, index: usize) -> HlsResult<bool> {
match self {
Proxy::Null => return Ok(true),
Proxy::HttpPlain(v) => {
let peer_addr = peer_addr.to_string();
writer.write_slice(b"CONNECT ")?;
writer.write_slice(peer_addr.as_bytes())?;
writer.write_slice(b" HTTP/1.1\r\n")?;
writer.write_slice(b"Host: ")?;
writer.write_slice(peer_addr.as_bytes())?;
writer.write_slice(b"\r\n")?;
if !v.username().is_empty() && !v.password().is_empty() {
writer.write_slice(b"Proxy-Authorization: Basic ")?;
let auth = base64::b64encode(format!("{}:{}", v.username(), v.password()))?;
writer.write_slice(auth.as_bytes())?;
writer.write_slice(b"\r\n")?;
}
writer.write_slice(b"Proxy-Connection: Keep-Alive\r\n\r\n")?;
return Ok(true);
}
Proxy::Socks5(v) => {
if index == 0 {
if v.username().is_empty() || v.password().is_empty() {
writer.write_slice(&[5, 1, 0])?;
} else {
writer.write_slice(&[5, 1, 2])?;
}
}
if index == 1 {
if v.username().is_empty() || v.password().is_empty() {
} else {
writer.write_u8(1)?;
writer.write_u8(v.username().len() as u8)?;
writer.write_slice(v.username().as_bytes())?;
writer.write_u8(v.password().len() as u8)?;
writer.write_slice(v.password().as_bytes())?;
}
}
if index == 2 {
writer.write_slice(&[5, 1, 0])?;
if let Ok(addr) = IpAddr::from_str(peer_addr.host()) {
writer.write_u8(1)?;
match addr {
IpAddr::V4(v4) => writer.write_slice(&v4.octets())?,
IpAddr::V6(v6) => writer.write_slice(&v6.octets())?,
}
} else {
writer.write_u8(3)?;
writer.write_u8(peer_addr.host().len() as u8)?;
writer.write_slice(peer_addr.host().as_bytes())?;
}
writer.write_u16(peer_addr.port())?;
return Ok(true);
}
}
}
Ok(false)
}
pub fn is_null(&self) -> bool {
matches!(self, Proxy::Null)
}
pub fn socket_addr(&self, peer_addr: &Addr, ech: bool) -> HlsResult<SocketAddr> {
match self {
Proxy::Null => Ok(peer_addr.socket_addr(ech)?),
Proxy::HttpPlain(url) => Ok(url.addr().socket_addr(ech)?),
Proxy::Socks5(url) => Ok(url.addr().socket_addr(ech)?),
}
}
}
impl Display for Proxy {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Proxy::Null => f.write_str("Null"),
Proxy::HttpPlain(url) => write!(f, "HttpPlain({})", url),
Proxy::Socks5(url) => write!(f, "Socks5({})", url),
}
}
}
impl TryFrom<&str> for Proxy {
type Error = HlsError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
let url = Url::try_from(value)?;
match url.protocol() {
Scheme::Http => Ok(Proxy::HttpPlain(url)),
Scheme::Socks5 => Ok(Proxy::Socks5(url)),
_ => Err("unsupported proxy scheme".into())
}
}
}
impl TryFrom<String> for Proxy {
type Error = HlsError;
fn try_from(value: String) -> Result<Self, Self::Error> {
Proxy::try_from(value.as_str())
}
}
pub struct ProxyStream<S> {
stream: S,
handle_proxy: bool,
http_proxy: bool,
buffer: Buffer,
resp: Response,
}
impl<S> ProxyStream<S> {
pub fn stream_mut(&mut self) -> &mut S {
&mut self.stream
}
}
impl ProxyStream<std::net::TcpStream> {
fn create_sync(addr: &SocketAddr, timeout: &Timeout) -> HlsResult<std::net::TcpStream> {
let stream = std::net::TcpStream::connect_timeout(addr, timeout.connect())?;
stream.set_read_timeout(Some(timeout.read()))?;
stream.set_write_timeout(Some(timeout.write()))?;
Ok(stream)
}
pub fn sync_connect(proxy: &Proxy, peer_addr: &Addr, timeout: &Timeout, ech: bool) -> HlsResult<ProxyStream<std::net::TcpStream>> {
#[cfg(feature = "log")]
debug!("[ProxyStream] Proxy: {} | PeerAddr: {}",proxy,peer_addr);
let addr = proxy.socket_addr(peer_addr, ech)?;
let mut stream = ProxyStream::create_sync(&addr, timeout)?;
let mut buffer = Buffer::with_capacity(1024);
for i in 0..4 {
buffer.reset();
let finish = proxy.write_context(peer_addr, &mut buffer, i)?;
if buffer.is_empty() { continue; }
io::Write::write_all(&mut stream, buffer.filled())?;
if finish { break; }
}
buffer.reset();
Ok(ProxyStream {
stream,
handle_proxy: matches!(proxy,Proxy::Null),
http_proxy: matches!(proxy, Proxy::HttpPlain(_)),
buffer,
resp: Response::new(),
})
}
pub fn shutdown(&mut self) -> HlsResult<()> {
self.stream.shutdown(Shutdown::Both)?;
Ok(())
}
fn sync_read(&mut self, buf: Option<&mut [u8]>) -> io::Result<usize> {
let buf = buf.unwrap_or(self.buffer.unfilled());
loop {
match self.stream.read(buf) {
Ok(len) => return Ok(len),
Err(e) => match e.kind() {
ErrorKind::Interrupted => continue,
_ => return Err(e),
}
}
}
}
}
impl Read for ProxyStream<std::net::TcpStream> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if !self.handle_proxy {
self.handle_proxy = true;
self.buffer.reset();
if self.http_proxy {
loop {
let len = self.sync_read(None)?;
if len == 0 { return Err(io::Error::other(HlsError::PeerClosedConnection)); }
self.buffer.add_len(len);
if self.resp.extend_buffer(&mut self.buffer)? { break; }
}
let status = self.resp.header().status().code();
if status != 200 { return Err(io::Error::other(format!("connect http proxy error-{}", status))); }
} else {
let len = self.sync_read(None)?;
if len == 0 { return Err(io::Error::other(HlsError::PeerClosedConnection)); }
self.buffer.add_len(len);
if self.buffer.filled().starts_with(&[5, 2]) {
if self.buffer.len() == 2 {
let len = self.sync_read(None)?;
if len == 0 { return Err(io::Error::other(HlsError::PeerClosedConnection)); }
self.buffer.add_len(len);
}
if self.buffer.filled()[3] != 0 { return Err(io::Error::other("socks5 auth fail")); }
self.buffer.used_empty(2);
}
self.buffer.used_empty(2);
if self.buffer.is_empty() {
let len = self.sync_read(None)?;
if len == 0 { return Err(io::Error::other(HlsError::PeerClosedConnection)); }
self.buffer.add_len(len);
}
self.buffer.used_empty(10);
}
if self.buffer.is_empty() {
Ok(0)
} else {
buf[..self.buffer.len()].copy_from_slice(self.buffer.filled());
Ok(self.buffer.len())
}
} else {
self.sync_read(Some(buf))
}
}
}
impl io::Write for ProxyStream<std::net::TcpStream> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if !self.handle_proxy { Read::read(self, &mut [])?; }
io::Write::write(&mut self.stream, buf)
}
fn flush(&mut self) -> io::Result<()> {
io::Write::flush(&mut self.stream)
}
}
#[cfg(feature = "aync")]
impl ProxyStream<tokio::net::TcpStream> {
pub async fn async_connect(proxy: &Proxy, peer_addr: &Addr, ech: bool) -> HlsResult<ProxyStream<tokio::net::TcpStream>> {
#[cfg(feature = "log")]
debug!("[ProxyStream] Proxy: {} | PeerAddr: {}",proxy,peer_addr);
let st = Time::now_mills();
let addr = proxy.socket_addr(peer_addr, ech)?;
println!("DNS TIME: {}", Time::now_mills() - st);
let mut stream = tokio::net::TcpStream::connect(addr).await?;
let mut buffer = Buffer::with_capacity(1024);
for i in 0..4 {
buffer.reset();
let finish = proxy.write_context(peer_addr, &mut buffer, i)?;
if buffer.is_empty() { continue; }
tokio::io::AsyncWriteExt::write_all(&mut stream, buffer.filled()).await?;
if finish { break; }
}
buffer.reset();
Ok(ProxyStream {
stream,
handle_proxy: matches!(proxy,Proxy::Null),
http_proxy: matches!(proxy, Proxy::HttpPlain(_)),
buffer,
resp: Response::new(),
})
}
}
#[cfg(feature = "aync")]
impl tokio::io::AsyncRead for ProxyStream<tokio::net::TcpStream> {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
if !self.handle_proxy {
let stream = self.get_mut();
if stream.http_proxy {
loop {
let mut pb = ReadBuf::new(stream.buffer.unfilled());
match Pin::new(&mut stream.stream).poll_read(cx, &mut pb) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Ready(Ok(())) => {
let rl = pb.filled().len();
if rl == 0 { return Poll::Ready(Err(HlsError::PeerClosedConnection.into())); }
stream.buffer.add_len(rl);
let finished = stream.resp.extend_buffer(&mut stream.buffer)?;
if finished { break; }
}
}
}
let status = stream.resp.header().status();
if status.code() != 200 { return Poll::Ready(Err(io::Error::other(format!("connect http proxy fail-{}", status.code())))); }
} else {
loop {
let mut pb = ReadBuf::new(stream.buffer.unfilled());
match Pin::new(&mut stream.stream).poll_read(cx, &mut pb) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Ready(Ok(())) => {
let rl = pb.filled().len();
if rl == 0 { return Poll::Ready(Err(io::Error::other(HlsError::PeerClosedConnection))); }
stream.buffer.add_len(rl);
if stream.buffer.len() < 2 { continue; }
if stream.buffer.filled()[1] == 2 {
if stream.buffer.len() < 4 { continue; }
if stream.buffer.filled()[3] == 0 {
if stream.buffer.len() >= 14 {
stream.buffer.used_empty(14);
break;
}
} else { return Poll::Ready(Err(io::Error::other("socks5 auth fail"))); }
} else if stream.buffer.len() >= 12 {
stream.buffer.used_empty(12);
break;
}
}
}
}
}
stream.handle_proxy = true;
if stream.buffer.is_empty() {
Poll::Ready(Ok(()))
} else {
buf.put_slice(stream.buffer.filled());
Poll::Ready(Ok(()))
}
} else {
Pin::new(&mut self.stream).poll_read(cx, buf)
}
}
}
#[cfg(feature = "aync")]
impl tokio::io::AsyncWrite for ProxyStream<tokio::net::TcpStream> {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
if !self.handle_proxy {
match tokio::io::AsyncRead::poll_read(Pin::new(&mut self), cx, &mut ReadBuf::new(&mut [])) {
Poll::Ready(Ok(_)) => {}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
};
}
Pin::new(&mut self.stream).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.stream).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.stream).poll_shutdown(cx)
}
}