use anyhow::Result;
use clap::Parser;
use kcptun_rust::{CompStream, ServerConfig, create_block_crypt, derive_key, wrap_with_qpp};
use rust_tokio_kcp::{KcpConfig, KcpListener, KcpNoDelayConfig, KcpStream};
use smux_rust::{server, Config as SmuxConfig, Stream};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tracing::{debug, error, info};
#[derive(Parser)]
#[command(name = "kcptun-server")]
#[command(about = "KCP tunnel server", version = "0.1.0")]
struct Args {
#[arg(short = 'l', long, default_value = ":29900")]
listen: String,
#[arg(short = 't', long, default_value = "127.0.0.1:12948")]
target: String,
#[arg(long, default_value = "it's a secrect")]
key: String,
#[arg(long, default_value = "aes")]
crypt: String,
#[arg(long, default_value = "fast")]
mode: String,
#[arg(long, default_value = "1350")]
mtu: u32,
#[arg(long, default_value = "0")]
ratelimit: u32,
#[arg(long, default_value = "1024")]
sndwnd: u32,
#[arg(long, default_value = "1024")]
rcvwnd: u32,
#[arg(long, default_value = "10")]
datashard: u32,
#[arg(long, default_value = "3")]
parityshard: u32,
#[arg(long, default_value = "0")]
dscp: u32,
#[arg(long)]
nocomp: bool,
#[arg(long)]
acknodelay: bool,
#[arg(long, default_value = "2")]
smuxver: u32,
#[arg(long, default_value = "4194304")]
smuxbuf: u32,
#[arg(long, default_value = "8192")]
framesize: u32,
#[arg(long, default_value = "2097152")]
streambuf: u32,
#[arg(long, default_value = "10")]
keepalive: u32,
#[arg(long, default_value = "30")]
closewait: u32,
#[arg(short = 'c', long)]
config: Option<String>,
#[arg(long)]
quiet: bool,
#[arg(long)]
qpp: bool,
#[arg(long, default_value = "61")]
qpp_count: u32,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
)
.init();
let args = Args::parse();
let config = if let Some(config_path) = args.config {
ServerConfig::from_json(config_path)?
} else {
let mut cfg = ServerConfig {
listen: args.listen,
target: args.target,
key: args.key,
crypt: args.crypt,
mode: args.mode,
mtu: args.mtu,
ratelimit: args.ratelimit,
sndwnd: args.sndwnd,
rcvwnd: args.rcvwnd,
datashard: args.datashard,
parityshard: args.parityshard,
dscp: args.dscp,
nocomp: args.nocomp,
acknodelay: args.acknodelay,
nodelay: 0,
interval: 50,
resend: 0,
nc: 0,
sockbuf: 4194304,
smuxver: args.smuxver,
smuxbuf: args.smuxbuf,
framesize: args.framesize,
streambuf: args.streambuf,
keepalive: args.keepalive,
log: String::new(),
snmplog: String::new(),
snmpperiod: 60,
pprof: false,
quiet: args.quiet,
tcp: false,
qpp: args.qpp,
qpp_count: args.qpp_count,
closewait: args.closewait,
};
cfg.apply_mode();
cfg
};
info!("version: 0.1.0");
info!("listening on: {}", config.listen);
info!("target: {}", config.target);
info!("encryption: {}", config.crypt);
info!("mode: {}", config.mode);
info!("nodelay parameters: {} {} {} {}",
config.nodelay, config.interval, config.resend, config.nc);
info!("sndwnd: {}, rcvwnd: {}", config.sndwnd, config.rcvwnd);
info!("compression: {}", !config.nocomp);
info!("mtu: {}", config.mtu);
info!("datashard: {}, parityshard: {}", config.datashard, config.parityshard);
let key = derive_key(&config.key);
let listen_addr: std::net::SocketAddr = config.listen.parse()?;
let block_crypt = create_block_crypt(&config.crypt, &key)?;
let kcp_config = KcpConfig {
mtu: config.mtu as usize,
nodelay: KcpNoDelayConfig {
nodelay: config.nodelay != 0,
interval: config.interval as i32,
resend: config.resend as i32,
nc: config.nc != 0,
},
wnd_size: (config.sndwnd as u16, config.rcvwnd as u16),
stream: true,
flush_write: false,
flush_acks_input: config.acknodelay,
fec_data_shards: config.datashard as usize,
fec_parity_shards: config.parityshard as usize,
crypt: block_crypt,
..Default::default()
};
let config_arc = Arc::new(config);
let mut listener = KcpListener::bind(kcp_config, listen_addr).await?;
info!("Listening on {}", listen_addr);
info!("Waiting for connections...");
loop {
info!("Calling listener.accept()...");
match listener.accept().await {
Ok((kcp_stream, peer_addr)) => {
info!("Accepted connection from {}", peer_addr);
let config_clone = config_arc.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(kcp_stream, &config_clone).await {
error!("Error handling connection: {}", e);
}
});
}
Err(e) => {
error!("Error accepting connection: {}", e);
}
}
}
}
async fn handle_connection(
kcp_stream: KcpStream,
config: &ServerConfig,
) -> Result<()> {
let smux_config = SmuxConfig {
version: config.smuxver as u8,
keep_alive_disabled: false,
keep_alive_interval: Duration::from_secs(config.keepalive as u64),
keep_alive_timeout: Duration::from_secs(config.keepalive as u64 * 3),
max_frame_size: config.framesize as usize,
max_receive_buffer: config.smuxbuf as usize,
max_stream_buffer: config.streambuf as usize,
};
let session = if config.nocomp {
server(Box::new(kcp_stream), Some(smux_config)).await?
} else {
let comp_stream = CompStream::new(kcp_stream);
server(Box::new(comp_stream), Some(smux_config)).await?
};
loop {
match session.accept_stream().await {
Ok(stream) => {
let target = config.target.clone();
let quiet = config.quiet;
let qpp = config.qpp;
let qpp_count = config.qpp_count;
let key = config.key.clone();
tokio::spawn(async move {
if let Err(e) = handle_stream(stream, &target, quiet, qpp, qpp_count, &key).await
{
if !quiet {
error!("Error handling stream: {}", e);
}
}
});
}
Err(e) => {
error!("Failed to accept stream: {}", e);
break;
}
}
}
Ok(())
}
async fn handle_stream(
remote: Stream,
target: &str,
quiet: bool,
qpp: bool,
qpp_count: u32,
key: &str,
) -> Result<()> {
let local = TcpStream::connect(target).await?;
if !quiet {
info!("Stream opened to {}", target);
}
let (mut lr, mut lw) = tokio::io::split(local);
let (rr, rw) = tokio::io::split(remote);
let (mut rr, mut rw): (
Box<dyn tokio::io::AsyncRead + Unpin + Send>,
Box<dyn tokio::io::AsyncWrite + Unpin + Send>,
) = if qpp {
let (r, w) = wrap_with_qpp(rr, rw, key.as_bytes(), qpp_count);
(Box::new(r), Box::new(w))
} else {
(Box::new(rr), Box::new(rw))
};
let t1 = tokio::spawn(async move {
let mut buf = vec![0u8; 8192];
loop {
match lr.read(&mut buf).await {
Ok(0) => {
debug!("读取目标服务器EOF");
break;
}
Ok(n) => {
debug!("从目标服务器读取 {} 字节", n);
if rw.write_all(&buf[..n]).await.is_err() {
error!("写入远程流失败");
break;
}
if rw.flush().await.is_err() {
error!("刷新远程流失败");
break;
}
debug!("成功转发 {} 字节到远程流", n);
}
Err(e) => {
error!("读取目标服务器错误: {}", e);
break;
}
}
}
});
let t2 = tokio::spawn(async move {
let mut buf = vec![0u8; 8192];
loop {
match rr.read(&mut buf).await {
Ok(0) => {
debug!("读取远程流EOF");
break;
}
Ok(n) => {
debug!("从远程流读取 {} 字节", n);
if lw.write_all(&buf[..n]).await.is_err() {
error!("写入目标服务器失败");
break;
}
if lw.flush().await.is_err() {
error!("刷新目标服务器失败");
break;
}
debug!("成功转发 {} 字节到目标服务器", n);
}
Err(e) => {
error!("读取远程流错误: {}", e);
break;
}
}
}
});
let _ = tokio::join!(t1, t2);
if !quiet {
info!("Stream closed");
}
Ok(())
}