jets 0.4.3

A Rust rule-based tunnel
Documentation
pub mod socks4;
pub mod socks5;

use super::http::handle_tcp as http_handle_tcp;
use super::Inbound;
use crate::app::config::Account;
use crate::app::Context;
use crate::common::invalid_data_error;
use crate::transport::raw::{AcceptOpts, TcpListener, UdpSocket};
use async_trait::async_trait;
use futures::{future, FutureExt};
use socks4::Socks4aInbound;
use socks5::Socks5Inbound;
pub use socks5::Socks5Outbound;
use std::collections::HashMap;
use std::io::{Error, ErrorKind, Result};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

#[derive(Clone, Debug)]
pub struct SocksInbound {
    addr: SocketAddr,
    accounts: HashMap<String, String>,
    udp_enabled: bool,
    accept_opts: AcceptOpts,
}

impl SocksInbound {
    pub fn new(
        addr: SocketAddr,
        accounts: Vec<Account>,
        udp_enabled: bool,
        accept_opts: AcceptOpts,
    ) -> Self {
        let accounts: HashMap<_, _> = accounts.into_iter().map(|a| (a.user, a.pass)).collect();
        Self {
            addr,
            accounts,
            udp_enabled,
            accept_opts,
        }
    }
}

#[async_trait]
impl Inbound for SocksInbound {
    fn clone_box(&self) -> Box<dyn Inbound> {
        Box::new(self.clone())
    }

    async fn run(&self, context: Context, channel: Option<mpsc::Sender<String>>) -> Result<()> {
        let channel_clone = channel.clone();
        let run_tcp = async {
            let listener =
                TcpListener::bind_with_opts(&self.addr, self.accept_opts.clone()).await?;
            let addr = listener.local_addr()?;
            log::info!("Starting socks tcp server, listening on: {}", addr);
            if let Some(channel) = channel_clone {
                let _ = channel.send("socks".to_string()).await;
            }

            loop {
                let (stream, peer_addr) = match listener.accept().await {
                    Ok(s) => s,
                    Err(e) => {
                        log::error!("socks server {} accept failed with error: {}", addr, e);
                        sleep(Duration::from_secs(1)).await;
                        continue;
                    }
                };
                log::debug!("{} -> {}", peer_addr, addr);
                let context = context.clone();
                let addr = self.addr;
                let accounts = self.accounts.clone();
                let udp_enabled = self.udp_enabled;
                tokio::spawn(async move {
                    match handle_tcp(stream, context, addr, peer_addr, accounts, udp_enabled).await
                    {
                        Ok(_) => Ok(()),
                        Err(e) if e.kind() == ErrorKind::WouldBlock => {
                            log::info!("{} to inbound {} blocked: {:#}", peer_addr, addr, e);
                            Ok(())
                        }
                        Err(e) => {
                            log::error!("{} to Inbound {} failed: {:#}", peer_addr, addr, e);
                            Err(e)
                        }
                    }
                });
            }
        };

        let mut vfut = Vec::new();
        vfut.push(run_tcp.boxed());

        if self.udp_enabled {
            let run_udp = async {
                let socket =
                    UdpSocket::listen_with_opts(&self.addr, self.accept_opts.clone()).await?;
                let addr = socket.local_addr()?;
                log::info!("Starting socks5 udp server, listening on: {}", addr);
                if let Some(channel) = channel {
                    let _ = channel.send("socks_udp".to_string()).await;
                }
                let socket = Arc::new(socket);
                let socks5_inbound =
                    Socks5Inbound::new(self.addr, self.accounts.clone(), self.udp_enabled);
                socks5_inbound.handle_udp(socket, context.clone()).await
            };
            vfut.push(run_udp.boxed());
        }

        let (res, ..) = future::select_all(vfut).await;
        res
    }
}

pub async fn handle_tcp(
    stream: TcpStream,
    context: Context,
    addr: SocketAddr,
    peer_addr: SocketAddr,
    accounts: HashMap<String, String>,
    udp_enabled: bool,
) -> Result<()> {
    let mut version_buffer = [0u8; 1];
    let n = stream.peek(&mut version_buffer).await?;
    if n == 0 {
        return Err(Error::from(ErrorKind::UnexpectedEof));
    }

    match version_buffer[0] {
        0x04 => {
            if !accounts.is_empty() {
                Err(Error::other("SOCKS4 dose not support authentication"))
            } else {
                let socks4a_inbound = Socks4aInbound;
                socks4a_inbound.handle_tcp(stream, peer_addr, context).await
            }
        }
        0x05 => {
            let socks5_inbound = Socks5Inbound::new(addr, accounts, udp_enabled);
            socks5_inbound.handle_tcp(stream, peer_addr, context).await
        }
        b'G' | b'g' | b'H' | b'h' | b'P' | b'p' | b'D' | b'd' | b'C' | b'c' | b'O' | b'o'
        | b'T' | b't' => http_handle_tcp(stream, context, &accounts).await,
        version => Err(invalid_data_error(format!(
            "Socks version {:#x} is not supported",
            version
        ))),
    }
}