use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tracing::{error, info, warn};
fn soft_nofile_limit() -> usize {
#[cfg(target_os = "linux")]
{
if let Ok(content) = std::fs::read_to_string("/proc/self/limits") {
for line in content.lines() {
if line.starts_with("Max open files") {
if let Some(soft_str) = line.split_whitespace().nth(3) {
if soft_str == "unlimited" {
return usize::MAX;
}
if let Ok(n) = soft_str.parse::<usize>() {
return n;
}
}
}
}
}
}
1024 }
fn auto_max_connections(fd_limit: usize, pool_size: usize, dc_buckets: usize) -> usize {
if fd_limit == usize::MAX {
return 512;
}
let reserved = 1 + pool_size * dc_buckets * 2 + 32;
(fd_limit.saturating_sub(reserved) / 2).max(4)
}
mod config;
mod crypto;
mod pool;
mod proxy;
mod splitter;
mod ws_client;
use config::Config;
use pool::WsPool;
#[tokio::main]
async fn main() {
rustls::crypto::ring::default_provider()
.install_default()
.expect("failed to install rustls ring CryptoProvider");
let config = Config::from_args();
let log_level = if config.quiet {
"off"
} else if config.verbose {
"debug"
} else {
"info"
};
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| log_level.into()),
)
.init();
let addr: SocketAddr = format!("{}:{}", config.host, config.port)
.parse()
.expect("invalid listen address");
let listener = TcpListener::bind(addr)
.await
.unwrap_or_else(|e| panic!("cannot bind {}: {}", addr, e));
let fd_limit = soft_nofile_limit();
let dc_redirects = config.dc_redirects();
let dc_buckets = dc_redirects.len() * 2; let max_connections = match config.max_connections {
Some(n) => {
let safe = auto_max_connections(fd_limit, config.pool_size, dc_buckets);
if n > safe {
warn!(
"max-connections={} may exceed the safe limit for this system's \
FD budget (fd-limit={}, recommended ≤{}). \
Consider raising `ulimit -n` or reducing --max-connections.",
n, fd_limit, safe
);
}
n
}
None => auto_max_connections(fd_limit, config.pool_size, dc_buckets),
};
let secret = config.secret.as_deref().unwrap_or("");
let link_host = config.link_host();
let tg_link = format!(
"tg://proxy?server={}&port={}&secret=dd{}",
link_host, config.port, secret
);
info!("{}", "=".repeat(60));
info!(" Telegram MTProto WS Bridge Proxy (tg-ws-proxy-rs)");
info!(" Listening on {}:{}", config.host, config.port);
info!(" Secret: {}", secret);
info!(" Target DC IPs:");
let mut dcs: Vec<_> = dc_redirects.iter().collect();
dcs.sort_by_key(|(k, _)| *k);
for (dc, ip) in &dcs {
info!(" DC{}: {}", dc, ip);
}
if config.skip_tls_verify {
info!(" ⚠ TLS certificate verification DISABLED");
}
info!(" Max connections: {} (fd-limit: {})", max_connections, fd_limit);
info!("{}", "=".repeat(60));
info!(" Telegram proxy link (use this on all devices):");
info!(" {}", tg_link);
if link_host != config.host {
info!(
" ℹ Link uses auto-detected IP {}. \
Use --link-ip <IP> to override.",
link_host
);
} else if matches!(config.host.as_str(), "127.0.0.1" | "::1") {
warn!(
" ⚠ Link shows {} — only the local machine can use this link. \
Run with --host 0.0.0.0 (or --link-ip <router-LAN-IP>) \
so other devices on the network can connect.",
config.host
);
}
info!("{}", "=".repeat(60));
let pool = Arc::new(WsPool::new(config.pool_size));
{
let pool_clone = pool.clone();
let config_clone = config.clone();
tokio::spawn(async move {
pool_clone.warmup(&config_clone).await;
});
}
const EMFILE: i32 = 24; const ENFILE: i32 = 23; let semaphore = Arc::new(Semaphore::new(max_connections));
loop {
let permit = Arc::clone(&semaphore)
.acquire_owned()
.await
.expect("semaphore closed unexpectedly");
match listener.accept().await {
Ok((stream, peer_addr)) => {
let cfg = config.clone();
let pool = pool.clone();
tokio::spawn(async move {
let _permit = permit;
proxy::handle_client(stream, peer_addr, cfg, pool).await;
});
}
Err(e) => {
if matches!(e.raw_os_error(), Some(EMFILE) | Some(ENFILE)) {
warn!("accept error: {} — backing off to allow FDs to free", e);
tokio::time::sleep(Duration::from_millis(500)).await;
} else {
error!("accept error: {}", e);
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
}
}
}