use std::io::IsTerminal as _;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tracing::{error, info, warn};
fn soft_nofile_limit() -> usize {
#[cfg(target_os = "linux")]
{
if let Ok(content) = std::fs::read_to_string("/proc/self/limits") {
for line in content.lines() {
if line.starts_with("Max open files") {
if let Some(soft_str) = line.split_whitespace().nth(3) {
if soft_str == "unlimited" {
return usize::MAX;
}
if let Ok(n) = soft_str.parse::<usize>() {
return n;
}
}
}
}
}
}
1024 }
fn auto_max_connections(fd_limit: usize, pool_size: usize, dc_buckets: usize) -> usize {
if fd_limit == usize::MAX {
return 512;
}
let reserved = 1 + pool_size * dc_buckets * 2 + 32;
(fd_limit.saturating_sub(reserved) / 2).max(4)
}
mod config;
mod crypto;
mod pool;
mod proxy;
mod splitter;
mod ws_client;
use config::Config;
use pool::WsPool;
#[tokio::main]
async fn main() {
rustls::crypto::ring::default_provider()
.install_default()
.expect("failed to install rustls ring CryptoProvider");
let config = Config::from_args();
let log_level = if config.quiet {
"off"
} else if config.verbose {
"debug"
} else {
"info"
};
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| log_level.into());
if let Some(ref path) = config.log_file {
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.unwrap_or_else(|e| panic!("cannot open log file '{}': {}", path, e));
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_ansi(false)
.with_writer(file)
.init();
} else {
let use_ansi = std::io::stderr().is_terminal() && !cfg!(windows);
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_ansi(use_ansi)
.init();
}
let addr: SocketAddr = format!("{}:{}", config.host, config.port)
.parse()
.expect("invalid listen address");
let listener = TcpListener::bind(addr)
.await
.unwrap_or_else(|e| panic!("cannot bind {}: {}", addr, e));
let fd_limit = soft_nofile_limit();
let dc_redirects = config.dc_redirects();
let dc_buckets = dc_redirects.len() * 2; let max_connections = match config.max_connections {
Some(n) => {
let safe = auto_max_connections(fd_limit, config.pool_size, dc_buckets);
if n > safe {
warn!(
"max-connections={} may exceed the safe limit for this system's \
FD budget (fd-limit={}, recommended ≤{}). \
Consider raising `ulimit -n` or reducing --max-connections.",
n, fd_limit, safe
);
}
n
}
None => auto_max_connections(fd_limit, config.pool_size, dc_buckets),
};
let secret = config.secret.as_deref().unwrap_or("");
let link_host = config.link_host();
let tg_link = format!(
"tg://proxy?server={}&port={}&secret=dd{}",
link_host, config.port, secret
);
info!("{}", "=".repeat(60));
info!(" Telegram MTProto WS Bridge Proxy (tg-ws-proxy-rs)");
info!(" Listening on {}:{}", config.host, config.port);
info!(" Secret: {}", secret);
info!(" Target DC IPs:");
let mut dcs: Vec<_> = dc_redirects.iter().collect();
dcs.sort_by_key(|(k, _)| *k);
for (dc, ip) in &dcs {
info!(" DC{}: {}", dc, ip);
}
if config.skip_tls_verify {
info!(" ⚠ TLS certificate verification DISABLED");
}
if !config.cf_domains.is_empty() {
info!(" Cloudflare proxy domain(s):");
for d in &config.cf_domains {
info!(" {} (kws{{N}}.{} subdomains)", d, d);
}
if config.cf_priority {
info!(" ⚡ CF priority mode: CF proxy is tried BEFORE direct WS");
}
}
if !config.mtproto_proxies.is_empty() {
info!(" Upstream MTProto proxies (WS fallback):");
for p in &config.mtproto_proxies {
info!(" {}:{}", p.host, p.port);
}
}
info!(
" Max connections: {} (fd-limit: {})",
max_connections, fd_limit
);
info!("{}", "=".repeat(60));
info!(" Telegram proxy link (use this on all devices):");
info!(" {}", tg_link);
if link_host != config.host {
info!(
" ℹ Link uses auto-detected IP {}. \
Use --link-ip <IP> to override.",
link_host
);
} else if matches!(config.host.as_str(), "127.0.0.1" | "::1") {
warn!(
" ⚠ Link shows {} — only the local machine can use this link. \
Run with --host 0.0.0.0 (or --link-ip <router-LAN-IP>) \
so other devices on the network can connect.",
config.host
);
}
info!("{}", "=".repeat(60));
let pool = Arc::new(WsPool::new(
config.pool_size,
Duration::from_secs(config.pool_max_age),
));
{
let pool_clone = pool.clone();
let config_clone = config.clone();
tokio::spawn(async move {
pool_clone.warmup(&config_clone).await;
});
}
const EMFILE: i32 = 24; const ENFILE: i32 = 23; let semaphore = Arc::new(Semaphore::new(max_connections));
loop {
let permit = Arc::clone(&semaphore)
.acquire_owned()
.await
.expect("semaphore closed unexpectedly");
match listener.accept().await {
Ok((stream, peer_addr)) => {
let cfg = config.clone();
let pool = pool.clone();
tokio::spawn(async move {
let _permit = permit;
proxy::handle_client(stream, peer_addr, cfg, pool).await;
});
}
Err(e) => {
if matches!(e.raw_os_error(), Some(EMFILE) | Some(ENFILE)) {
warn!("accept error: {} — backing off to allow FDs to free", e);
tokio::time::sleep(Duration::from_millis(500)).await;
} else {
error!("accept error: {}", e);
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
}
}
}