kcptun-rust 1.1.0

A Rust implementation of kcptun, a fast and reliable tunnel based on KCP protocol
Documentation
use anyhow::Result;
use clap::Parser;
use kcptun_rust::{CompStream, ServerConfig, create_block_crypt, derive_key, wrap_with_qpp};
use rust_tokio_kcp::{KcpConfig, KcpListener, KcpNoDelayConfig, KcpStream};
use smux_rust::{server, Config as SmuxConfig, Stream};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tracing::{debug, error, info};

#[derive(Parser)]
#[command(name = "kcptun-server")]
#[command(about = "KCP tunnel server", version = "0.1.0")]
struct Args {
    #[arg(short = 'l', long, default_value = ":29900")]
    listen: String,
    
    #[arg(short = 't', long, default_value = "127.0.0.1:12948")]
    target: String,
    
    #[arg(long, default_value = "it's a secrect")]
    key: String,
    
    #[arg(long, default_value = "aes")]
    crypt: String,
    
    #[arg(long, default_value = "fast")]
    mode: String,
    
    #[arg(long, default_value = "1350")]
    mtu: u32,
    
    #[arg(long, default_value = "0")]
    ratelimit: u32,
    
    #[arg(long, default_value = "1024")]
    sndwnd: u32,
    
    #[arg(long, default_value = "1024")]
    rcvwnd: u32,
    
    #[arg(long, default_value = "10")]
    datashard: u32,
    
    #[arg(long, default_value = "3")]
    parityshard: u32,
    
    #[arg(long, default_value = "0")]
    dscp: u32,
    
    #[arg(long)]
    nocomp: bool,
    
    #[arg(long)]
    acknodelay: bool,
    
    #[arg(long, default_value = "2")]
    smuxver: u32,
    
    #[arg(long, default_value = "4194304")]
    smuxbuf: u32,
    
    #[arg(long, default_value = "8192")]
    framesize: u32,
    
    #[arg(long, default_value = "2097152")]
    streambuf: u32,
    
    #[arg(long, default_value = "10")]
    keepalive: u32,
    
    #[arg(long, default_value = "30")]
    closewait: u32,
    
    #[arg(short = 'c', long)]
    config: Option<String>,
    
    #[arg(long)]
    quiet: bool,
    
    #[arg(long)]
    qpp: bool,
    
    #[arg(long, default_value = "61")]
    qpp_count: u32,
}

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt()
        .with_env_filter(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
        )
        .init();
    
    let args = Args::parse();
    
    let config = if let Some(config_path) = args.config {
        ServerConfig::from_json(config_path)?
    } else {
        let mut cfg = ServerConfig {
            listen: args.listen,
            target: args.target,
            key: args.key,
            crypt: args.crypt,
            mode: args.mode,
            mtu: args.mtu,
            ratelimit: args.ratelimit,
            sndwnd: args.sndwnd,
            rcvwnd: args.rcvwnd,
            datashard: args.datashard,
            parityshard: args.parityshard,
            dscp: args.dscp,
            nocomp: args.nocomp,
            acknodelay: args.acknodelay,
            nodelay: 0,
            interval: 50,
            resend: 0,
            nc: 0,
            sockbuf: 4194304,
            smuxver: args.smuxver,
            smuxbuf: args.smuxbuf,
            framesize: args.framesize,
            streambuf: args.streambuf,
            keepalive: args.keepalive,
            log: String::new(),
            snmplog: String::new(),
            snmpperiod: 60,
            pprof: false,
            quiet: args.quiet,
            tcp: false,
            qpp: args.qpp,
            qpp_count: args.qpp_count,
            closewait: args.closewait,
        };
        cfg.apply_mode();
        cfg
    };
    
    info!("version: 0.1.0");
    info!("listening on: {}", config.listen);
    info!("target: {}", config.target);
    info!("encryption: {}", config.crypt);
    info!("mode: {}", config.mode);
    info!("nodelay parameters: {} {} {} {}", 
          config.nodelay, config.interval, config.resend, config.nc);
    info!("sndwnd: {}, rcvwnd: {}", config.sndwnd, config.rcvwnd);
    info!("compression: {}", !config.nocomp);
    info!("mtu: {}", config.mtu);
    info!("datashard: {}, parityshard: {}", config.datashard, config.parityshard);
    
    let key = derive_key(&config.key);
    
    // Parse listen address
    let listen_addr: std::net::SocketAddr = config.listen.parse()?;
    
    // Create block cipher
    let block_crypt = create_block_crypt(&config.crypt, &key)?;
    
    // Configure KCP
    let kcp_config = KcpConfig {
        mtu: config.mtu as usize,
        nodelay: KcpNoDelayConfig {
            nodelay: config.nodelay != 0,
            interval: config.interval as i32,
            resend: config.resend as i32,
            nc: config.nc != 0,
        },
        wnd_size: (config.sndwnd as u16, config.rcvwnd as u16),
        stream: true,
        flush_write: false,
        flush_acks_input: config.acknodelay,
        fec_data_shards: config.datashard as usize,
        fec_parity_shards: config.parityshard as usize,
        crypt: block_crypt,
        ..Default::default()
    };
    
    let config_arc = Arc::new(config);
    
    // Start KCP listener
    let mut listener = KcpListener::bind(kcp_config, listen_addr).await?;
    info!("Listening on {}", listen_addr);
    info!("Waiting for connections...");
    
    loop {
        info!("Calling listener.accept()...");
        match listener.accept().await {
            Ok((kcp_stream, peer_addr)) => {
                info!("Accepted connection from {}", peer_addr);
                
                let config_clone = config_arc.clone();
                
                tokio::spawn(async move {
                    if let Err(e) = handle_connection(kcp_stream, &config_clone).await {
                        error!("Error handling connection: {}", e);
                    }
                });
            }
            Err(e) => {
                error!("Error accepting connection: {}", e);
            }
        }
    }
}

async fn handle_connection(
    kcp_stream: KcpStream,
    config: &ServerConfig,
) -> Result<()> {
    // Create SMUX session
    let smux_config = SmuxConfig {
        version: config.smuxver as u8,
        keep_alive_disabled: false,
        keep_alive_interval: Duration::from_secs(config.keepalive as u64),
        keep_alive_timeout: Duration::from_secs(config.keepalive as u64 * 3),
        max_frame_size: config.framesize as usize,
        max_receive_buffer: config.smuxbuf as usize,
        max_stream_buffer: config.streambuf as usize,
    };
    
    // 根据 nocomp 配置决定是否使用压缩层
    let session = if config.nocomp {
        // 不使用压缩,直接使用 KCP stream
        server(Box::new(kcp_stream), Some(smux_config)).await?
    } else {
        // 使用 Snappy 压缩
        let comp_stream = CompStream::new(kcp_stream);
        server(Box::new(comp_stream), Some(smux_config)).await?
    };
    
    loop {
        match session.accept_stream().await {
            Ok(stream) => {
                let target = config.target.clone();
                let quiet = config.quiet;
                let qpp = config.qpp;
                let qpp_count = config.qpp_count;
                let key = config.key.clone();
                tokio::spawn(async move {
                    if let Err(e) = handle_stream(stream, &target, quiet, qpp, qpp_count, &key).await
                    {
                        if !quiet {
                            error!("Error handling stream: {}", e);
                        }
                    }
                });
            }
            Err(e) => {
                error!("Failed to accept stream: {}", e);
                break;
            }
        }
    }
    
    Ok(())
}

async fn handle_stream(
    remote: Stream,
    target: &str,
    quiet: bool,
    qpp: bool,
    qpp_count: u32,
    key: &str,
) -> Result<()> {
    let local = TcpStream::connect(target).await?;
    
    if !quiet {
        info!("Stream opened to {}", target);
    }
    
    let (mut lr, mut lw) = tokio::io::split(local);
    let (rr, rw) = tokio::io::split(remote);
    let (mut rr, mut rw): (
        Box<dyn tokio::io::AsyncRead + Unpin + Send>,
        Box<dyn tokio::io::AsyncWrite + Unpin + Send>,
    ) = if qpp {
        let (r, w) = wrap_with_qpp(rr, rw, key.as_bytes(), qpp_count);
        (Box::new(r), Box::new(w))
    } else {
        (Box::new(rr), Box::new(rw))
    };

    let t1 = tokio::spawn(async move {
        let mut buf = vec![0u8; 8192];
        loop {
            match lr.read(&mut buf).await {
                Ok(0) => {
                    debug!("读取目标服务器EOF");
                    break;
                }
                Ok(n) => {
                    debug!("从目标服务器读取 {} 字节", n);
                    if rw.write_all(&buf[..n]).await.is_err() {
                        error!("写入远程流失败");
                        break;
                    }
                    if rw.flush().await.is_err() {
                        error!("刷新远程流失败");
                        break;
                    }
                    debug!("成功转发 {} 字节到远程流", n);
                }
                Err(e) => {
                    error!("读取目标服务器错误: {}", e);
                    break;
                }
            }
        }
    });
    
    let t2 = tokio::spawn(async move {
        let mut buf = vec![0u8; 8192];
        loop {
            match rr.read(&mut buf).await {
                Ok(0) => {
                    debug!("读取远程流EOF");
                    break;
                }
                Ok(n) => {
                    debug!("从远程流读取 {} 字节", n);
                    if lw.write_all(&buf[..n]).await.is_err() {
                        error!("写入目标服务器失败");
                        break;
                    }
                    if lw.flush().await.is_err() {
                        error!("刷新目标服务器失败");
                        break;
                    }
                    debug!("成功转发 {} 字节到目标服务器", n);
                }
                Err(e) => {
                    error!("读取远程流错误: {}", e);
                    break;
                }
            }
        }
    });
    
    let _ = tokio::join!(t1, t2);
    
    if !quiet {
        info!("Stream closed");
    }
    
    Ok(())
}