use std::sync::Arc;
use std::time::Duration;
use crate::config::default_dc_overrides;
use futures_util::{SinkExt, StreamExt};
use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
use rustls::{
client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier},
DigitallySignedStruct, Error as TlsError, SignatureScheme,
};
use tokio::net::TcpStream;
use tokio_tungstenite::{
client_async_tls_with_config,
tungstenite::{client::IntoClientRequest, http::HeaderValue},
Connector, MaybeTlsStream, WebSocketStream,
};
use tracing::{debug, warn};
use tungstenite::Error as WsError;
use tungstenite::Message;
pub type TgWsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
pub fn ws_domains(dc: u32, is_media: bool) -> Vec<String> {
let overrides = default_dc_overrides();
let effective_dc = *overrides.get(&dc).unwrap_or(&dc);
if is_media {
vec![
format!("kws{}-1.web.telegram.org", effective_dc),
format!("kws{}.web.telegram.org", effective_dc),
]
} else {
vec![
format!("kws{}.web.telegram.org", effective_dc),
format!("kws{}-1.web.telegram.org", effective_dc),
]
}
}
#[derive(Debug)]
pub enum WsConnectResult {
Connected(TgWsStream),
Redirect(u16),
Failed(String),
}
pub async fn connect_ws(
ip: &str,
domain: &str,
skip_tls_verify: bool,
timeout: Duration,
) -> WsConnectResult {
let tcp = match tokio::time::timeout(timeout, TcpStream::connect(format!("{}:443", ip))).await {
Ok(Ok(s)) => s,
Ok(Err(e)) => return WsConnectResult::Failed(format!("TCP connect: {}", e)),
Err(_) => return WsConnectResult::Failed("TCP connect timed out".into()),
};
let _ = tcp.set_nodelay(true);
let url = format!("wss://{}/apiws", domain);
let mut request = match url.into_client_request() {
Ok(r) => r,
Err(e) => return WsConnectResult::Failed(format!("bad URL: {}", e)),
};
{
let h = request.headers_mut();
h.insert("Sec-WebSocket-Protocol", HeaderValue::from_static("binary"));
h.insert(
"Origin",
HeaderValue::from_static("https://web.telegram.org"),
);
h.insert(
"User-Agent",
HeaderValue::from_static(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
AppleWebKit/537.36 (KHTML, like Gecko) \
Chrome/131.0.0.0 Safari/537.36",
),
);
}
let connector = build_tls_connector(skip_tls_verify);
let result = tokio::time::timeout(
timeout,
client_async_tls_with_config(request, tcp, None, Some(connector)),
)
.await;
match result {
Ok(Ok((ws, response))) => {
let status = response.status().as_u16();
if status == 101 {
WsConnectResult::Connected(ws)
} else if matches!(status, 301 | 302 | 303 | 307 | 308) {
WsConnectResult::Redirect(status)
} else {
WsConnectResult::Failed(format!("unexpected HTTP status {}", status))
}
}
Ok(Err(e)) => {
if let WsError::Http(ref resp) = e {
let status = resp.status().as_u16();
if matches!(status, 301 | 302 | 303 | 307 | 308) {
return WsConnectResult::Redirect(status);
}
WsConnectResult::Failed(format!("HTTP {} from server", status))
} else {
WsConnectResult::Failed(e.to_string())
}
}
Err(_) => WsConnectResult::Failed("WebSocket handshake timed out".into()),
}
}
fn is_dns_not_found(reason: &str) -> bool {
reason.starts_with("TCP connect:")
&& (reason.contains("failed to lookup address information")
|| reason.contains("nodename nor servname provided")
|| reason.contains("No such host is known")
|| reason.contains("Name or service not known"))
}
pub async fn connect_ws_for_dc(
ip: &str,
dc: u32,
is_media: bool,
skip_tls_verify: bool,
timeout: Duration,
) -> (Option<TgWsStream>, bool) {
let domains = ws_domains(dc, is_media);
let mut all_redirects = true;
for domain in &domains {
debug!(
"WS trying DC{}{} → {} via {}",
dc,
if is_media { "m" } else { "" },
domain,
ip
);
match connect_ws(ip, domain, skip_tls_verify, timeout).await {
WsConnectResult::Connected(ws) => {
return (Some(ws), false);
}
WsConnectResult::Redirect(code) => {
warn!(
"WS DC{}{} got {} from {} (redirect)",
dc,
if is_media { "m" } else { "" },
code,
domain
);
}
WsConnectResult::Failed(reason) => {
warn!(
"WS DC{}{} failed on {}: {}",
dc,
if is_media { "m" } else { "" },
domain,
reason
);
all_redirects = false; }
}
}
(None, all_redirects)
}
pub fn cf_ws_domains(dc: u32, cf_domains: &[String], is_media: bool) -> Vec<String> {
let mut result = Vec::new();
for cf_domain in cf_domains {
if is_media {
result.push(format!("kws{}-1.{}", dc, cf_domain));
result.push(format!("kws{}.{}", dc, cf_domain));
} else {
result.push(format!("kws{}.{}", dc, cf_domain));
result.push(format!("kws{}-1.{}", dc, cf_domain));
}
}
result
}
pub async fn connect_cf_ws_for_dc(
dc: u32,
cf_domains: &[String],
is_media: bool,
skip_tls_verify: bool,
timeout: Duration,
) -> (Option<TgWsStream>, bool) {
let domains = cf_ws_domains(dc, cf_domains, is_media);
let mut all_redirects = true;
let mut tried: std::collections::HashSet<String> = std::collections::HashSet::new();
for domain in &domains {
if tried.contains(domain) {
continue;
}
tried.insert(domain.clone());
debug!(
"CF WS trying DC{}{} → {}",
dc,
if is_media { "m" } else { "" },
domain
);
match connect_ws(domain, domain, skip_tls_verify, timeout).await {
WsConnectResult::Connected(ws) => {
return (Some(ws), false);
}
WsConnectResult::Redirect(code) => {
warn!(
"CF WS DC{}{} got {} from {} (redirect)",
dc,
if is_media { "m" } else { "" },
code,
domain
);
}
WsConnectResult::Failed(reason) => {
if is_dns_not_found(&reason) && domain.contains("-1.") {
let fallback = domain.replacen("-1.", ".", 1);
debug!(
"CF WS DC{}{}: {} not in DNS, retrying with {}",
dc,
if is_media { "m" } else { "" },
domain,
fallback
);
tried.insert(fallback.clone());
match connect_ws(&fallback, &fallback, skip_tls_verify, timeout).await {
WsConnectResult::Connected(ws) => {
return (Some(ws), false);
}
WsConnectResult::Redirect(code) => {
warn!(
"CF WS DC{}{} got {} from {} (redirect)",
dc,
if is_media { "m" } else { "" },
code,
fallback
);
}
WsConnectResult::Failed(reason2) => {
warn!(
"CF WS DC{}{} failed on {}: {}",
dc,
if is_media { "m" } else { "" },
fallback,
reason2
);
all_redirects = false;
}
}
} else {
warn!(
"CF WS DC{}{} failed on {}: {}",
dc,
if is_media { "m" } else { "" },
domain,
reason
);
all_redirects = false;
}
}
}
}
(None, all_redirects)
}
pub async fn ws_send(ws: &mut TgWsStream, data: Vec<u8>) -> Result<(), String> {
ws.send(Message::Binary(data))
.await
.map_err(|e| e.to_string())
}
#[allow(dead_code)]
pub async fn ws_recv(ws: &mut TgWsStream) -> Option<Vec<u8>> {
loop {
match ws.next().await {
Some(Ok(Message::Binary(b))) => return Some(b),
Some(Ok(Message::Text(t))) => return Some(t.into_bytes()),
Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => continue,
Some(Ok(Message::Close(_))) | None => return None,
Some(Err(_)) => return None,
Some(Ok(_)) => continue,
}
}
}
fn build_tls_connector(skip_verify: bool) -> Connector {
if skip_verify {
build_no_verify_connector()
} else {
Connector::Rustls(Arc::new(build_default_rustls_config()))
}
}
fn build_default_rustls_config() -> rustls::ClientConfig {
let root_store = webpki_roots_store();
rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth()
}
fn build_no_verify_connector() -> Connector {
let config = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(NoVerifier))
.with_no_client_auth();
Connector::Rustls(Arc::new(config))
}
fn webpki_roots_store() -> rustls::RootCertStore {
let mut store = rustls::RootCertStore::empty();
store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
store
}
#[derive(Debug)]
struct NoVerifier;
impl ServerCertVerifier for NoVerifier {
fn verify_server_cert(
&self,
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &ServerName<'_>,
_ocsp: &[u8],
_now: UnixTime,
) -> Result<ServerCertVerified, TlsError> {
Ok(ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, TlsError> {
Ok(HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, TlsError> {
Ok(HandshakeSignatureValid::assertion())
}
fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
vec![
SignatureScheme::RSA_PKCS1_SHA256,
SignatureScheme::RSA_PKCS1_SHA384,
SignatureScheme::RSA_PKCS1_SHA512,
SignatureScheme::ECDSA_NISTP256_SHA256,
SignatureScheme::ECDSA_NISTP384_SHA384,
SignatureScheme::ECDSA_NISTP521_SHA512,
SignatureScheme::RSA_PSS_SHA256,
SignatureScheme::RSA_PSS_SHA384,
SignatureScheme::RSA_PSS_SHA512,
SignatureScheme::ED25519,
]
}
}