1use std::sync::Arc;
14use std::time::Duration;
15
16use socket2::TcpKeepalive;
17use tokio::io::AsyncWriteExt;
18use tokio::net::TcpStream;
19
20use ferogram_mtproto::{EncryptedSession, Session, authentication as auth};
21use ferogram_tl_types as tl;
22
23use crate::error::ConnectError;
24use crate::frame::{recv_frame_plain, send_frame};
25use crate::pfs::decode_bind_response;
26use crate::transport::recv_raw_frame;
27use crate::transport_kind::TransportKind;
28
29pub const PING_DELAY_SECS: u64 = 60;
30pub const NO_PING_DISCONNECT: i32 = 75;
31
32const TCP_KEEPALIVE_IDLE_SECS: u64 = 10;
33const TCP_KEEPALIVE_INTERVAL_SECS: u64 = 5;
34#[cfg(not(target_os = "windows"))]
35const TCP_KEEPALIVE_PROBES: u32 = 3;
36
37#[derive(Clone)]
45pub enum FrameKind {
46 Abridged,
47 Intermediate,
48 Full {
49 send_seqno: Arc<std::sync::atomic::AtomicU32>,
50 recv_seqno: Arc<std::sync::atomic::AtomicU32>,
51 },
52 Obfuscated {
54 cipher: std::sync::Arc<tokio::sync::Mutex<ferogram_crypto::ObfuscatedCipher>>,
55 },
56 PaddedIntermediate {
58 cipher: std::sync::Arc<tokio::sync::Mutex<ferogram_crypto::ObfuscatedCipher>>,
59 },
60 FakeTls {
62 cipher: std::sync::Arc<tokio::sync::Mutex<ferogram_crypto::ObfuscatedCipher>>,
63 },
64}
65
66#[derive(Clone, Debug)]
69pub struct FutureSalt {
70 pub valid_since: i32,
71 pub valid_until: u32,
76 pub salt: i64,
77}
78
79pub const SALT_USE_DELAY: i32 = 60;
82
83pub struct Connection {
84 pub stream: TcpStream,
85 pub enc: EncryptedSession,
86 pub frame_kind: FrameKind,
87 pub perm_auth_key: Option<[u8; 256]>,
91}
92
93impl Connection {
94 async fn open_stream(
96 addr: &str,
97 socks5: Option<&crate::socks5::Socks5Config>,
98 transport: &TransportKind,
99 dc_id: i16,
100 ) -> Result<(TcpStream, FrameKind), ConnectError> {
101 let stream = match socks5 {
102 Some(proxy) => proxy.connect(addr).await?,
103 None => {
104 let stream = TcpStream::connect(addr).await.map_err(ConnectError::Io)?;
105 stream.set_nodelay(true).ok();
106 {
107 let sock = socket2::SockRef::from(&stream);
108 let keepalive = TcpKeepalive::new()
109 .with_time(Duration::from_secs(TCP_KEEPALIVE_IDLE_SECS))
110 .with_interval(Duration::from_secs(TCP_KEEPALIVE_INTERVAL_SECS));
111 #[cfg(not(target_os = "windows"))]
112 let keepalive = keepalive.with_retries(TCP_KEEPALIVE_PROBES);
113 sock.set_tcp_keepalive(&keepalive).ok();
114 }
115 stream
116 }
117 };
118 Self::apply_transport_init(stream, transport, dc_id).await
119 }
120
121 async fn open_stream_mtproxy(
124 mtproxy: &crate::proxy::MtProxyConfig,
125 dc_id: i16,
126 ) -> Result<(TcpStream, FrameKind), ConnectError> {
127 let stream = mtproxy.connect().await?;
128 stream.set_nodelay(true).ok();
129 Self::apply_transport_init(stream, &mtproxy.transport, dc_id).await
130 }
131
132 async fn apply_transport_init(
133 mut stream: TcpStream,
134 transport: &TransportKind,
135 dc_id: i16,
136 ) -> Result<(TcpStream, FrameKind), ConnectError> {
137 match transport {
138 TransportKind::Abridged => {
139 stream.write_all(&[0xef]).await?;
140 Ok((stream, FrameKind::Abridged))
141 }
142 TransportKind::Intermediate => {
143 stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
144 Ok((stream, FrameKind::Intermediate))
145 }
146 TransportKind::Full => {
147 Ok((
149 stream,
150 FrameKind::Full {
151 send_seqno: Arc::new(std::sync::atomic::AtomicU32::new(0)),
152 recv_seqno: Arc::new(std::sync::atomic::AtomicU32::new(0)),
153 },
154 ))
155 }
156 TransportKind::Obfuscated { secret } => {
157 let proxy_secret = secret.as_ref().map(|s| s.as_ref());
158 let (nonce, cipher) =
159 ferogram_crypto::build_obfuscated_init(0xef, dc_id, proxy_secret);
160 stream.write_all(&nonce).await?;
161 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
162 Ok((stream, FrameKind::Obfuscated { cipher: cipher_arc }))
163 }
164 TransportKind::PaddedIntermediate { secret } => {
165 let proxy_secret = secret.as_ref().map(|s| s.as_ref());
166 let (nonce, cipher) =
167 ferogram_crypto::build_obfuscated_init(0xdd, dc_id, proxy_secret);
168 stream.write_all(&nonce).await?;
169 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
170 Ok((stream, FrameKind::PaddedIntermediate { cipher: cipher_arc }))
171 }
172 TransportKind::FakeTls { secret, domain } => {
173 let domain_bytes = domain.as_bytes();
177 let mut session_id = [0u8; 32];
178 ferogram_crypto::fill_random(&mut session_id);
179
180 let cipher_suites: &[u8] = &[0x00, 0x04, 0x13, 0x01, 0x13, 0x02];
182 let compression: &[u8] = &[0x01, 0x00];
183 let sni_name_len = domain_bytes.len() as u16;
184 let sni_list_len = sni_name_len + 3;
185 let sni_ext_len = sni_list_len + 2;
186 let mut sni_ext = Vec::new();
187 sni_ext.extend_from_slice(&[0x00, 0x00]);
188 sni_ext.extend_from_slice(&sni_ext_len.to_be_bytes());
189 sni_ext.extend_from_slice(&sni_list_len.to_be_bytes());
190 sni_ext.push(0x00);
191 sni_ext.extend_from_slice(&sni_name_len.to_be_bytes());
192 sni_ext.extend_from_slice(domain_bytes);
193 let sup_ver: &[u8] = &[0x00, 0x2b, 0x00, 0x03, 0x02, 0x03, 0x04];
194 let sup_grp: &[u8] = &[0x00, 0x0a, 0x00, 0x04, 0x00, 0x02, 0x00, 0x1d];
195 let sess_tick: &[u8] = &[0x00, 0x23, 0x00, 0x00];
196 let ext_body_len = sni_ext.len() + sup_ver.len() + sup_grp.len() + sess_tick.len();
197 let mut extensions = Vec::new();
198 extensions.extend_from_slice(&(ext_body_len as u16).to_be_bytes());
199 extensions.extend_from_slice(&sni_ext);
200 extensions.extend_from_slice(sup_ver);
201 extensions.extend_from_slice(sup_grp);
202 extensions.extend_from_slice(sess_tick);
203
204 let mut hello_body = Vec::new();
205 hello_body.extend_from_slice(&[0x03, 0x03]);
206 hello_body.extend_from_slice(&[0u8; 32]); hello_body.push(session_id.len() as u8);
208 hello_body.extend_from_slice(&session_id);
209 hello_body.extend_from_slice(cipher_suites);
210 hello_body.extend_from_slice(compression);
211 hello_body.extend_from_slice(&extensions);
212
213 let hs_len = hello_body.len() as u32;
214 let mut handshake = vec![
215 0x01,
216 ((hs_len >> 16) & 0xff) as u8,
217 ((hs_len >> 8) & 0xff) as u8,
218 (hs_len & 0xff) as u8,
219 ];
220 handshake.extend_from_slice(&hello_body);
221
222 let rec_len = handshake.len() as u16;
223 let mut record = Vec::new();
224 record.push(0x16);
225 record.extend_from_slice(&[0x03, 0x01]);
226 record.extend_from_slice(&rec_len.to_be_bytes());
227 record.extend_from_slice(&handshake);
228
229 let random_offset = 5 + 4 + 2; let (hmac_result, cipher) = ferogram_crypto::build_fake_tls_keys(secret, &record);
235 record[random_offset..random_offset + 32].copy_from_slice(&hmac_result);
236 stream.write_all(&record).await?;
237
238 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
239 Ok((stream, FrameKind::FakeTls { cipher: cipher_arc }))
240 }
241 TransportKind::Http => {
242 stream.write_all(&[0xef]).await?;
244 Ok((stream, FrameKind::Abridged))
245 }
246 }
247 }
248
249 pub async fn open_stream_pub(
254 addr: &str,
255 dc_id: i16,
256 transport: &TransportKind,
257 socks5: Option<&crate::socks5::Socks5Config>,
258 mtproxy: Option<&crate::proxy::MtProxyConfig>,
259 ) -> Result<(TcpStream, FrameKind), ConnectError> {
260 if let Some(mp) = mtproxy {
261 Self::open_stream_mtproxy(mp, dc_id).await
262 } else {
263 Self::open_stream(addr, socks5, transport, dc_id).await
264 }
265 }
266
267 pub async fn connect_raw(
272 addr: &str,
273 socks5: Option<&crate::socks5::Socks5Config>,
274 mtproxy: Option<&crate::proxy::MtProxyConfig>,
275 transport: &TransportKind,
276 dc_id: i16,
277 ) -> Result<Self, ConnectError> {
278 let t_label = match transport {
279 TransportKind::Abridged => "Abridged",
280 TransportKind::Obfuscated { .. } => "Obfuscated",
281 TransportKind::PaddedIntermediate { .. } => "PaddedIntermediate",
282 TransportKind::Http => "Http",
283 TransportKind::Intermediate => "Intermediate",
284 TransportKind::Full => "Full",
285 TransportKind::FakeTls { .. } => "FakeTls",
286 };
287 tracing::debug!("[ferogram::connect] starting DH handshake with {addr} via {t_label}");
288
289 let addr2 = addr.to_string();
290 let socks5_c = socks5.cloned();
291 let mtproxy_c = mtproxy.cloned();
292 let transport_c = transport.clone();
293
294 let fut = async move {
295 let (mut stream, frame_kind) = if let Some(ref mp) = mtproxy_c {
296 Self::open_stream_mtproxy(mp, dc_id).await?
297 } else {
298 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c, dc_id).await?
299 };
300
301 let mut plain = Session::new();
302
303 let (req1, s1) = auth::step1().map_err(|e| ConnectError::other(e.to_string()))?;
304 send_frame(
305 &mut stream,
306 &plain.pack(&req1).to_plaintext_bytes(),
307 &frame_kind,
308 )
309 .await?;
310 let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
311
312 let (req2, s2) = auth::step2(s1, res_pq, dc_id as i32)
313 .map_err(|e| ConnectError::other(e.to_string()))?;
314 send_frame(
315 &mut stream,
316 &plain.pack(&req2).to_plaintext_bytes(),
317 &frame_kind,
318 )
319 .await?;
320 let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
321
322 let (req3, s3) = auth::step3(s2, dh).map_err(|e| ConnectError::other(e.to_string()))?;
323 send_frame(
324 &mut stream,
325 &plain.pack(&req3).to_plaintext_bytes(),
326 &frame_kind,
327 )
328 .await?;
329 let ans: tl::enums::SetClientDhParamsAnswer =
330 recv_frame_plain(&mut stream, &frame_kind).await?;
331
332 let done = {
334 let mut result =
335 auth::finish(s3, ans).map_err(|e| ConnectError::other(e.to_string()))?;
336 let mut attempts = 0u8;
337 loop {
338 match result {
339 auth::FinishResult::Done(d) => break d,
340 auth::FinishResult::Retry {
341 retry_id,
342 dh_params,
343 nonce,
344 server_nonce,
345 new_nonce,
346 } => {
347 attempts += 1;
348 if attempts >= 5 {
349 return Err(ConnectError::other(
350 "dh_gen_retry exceeded 5 attempts",
351 ));
352 }
353 let (req_retry, s3_retry) = auth::retry_step3(
354 &dh_params,
355 nonce,
356 server_nonce,
357 new_nonce,
358 retry_id,
359 )
360 .map_err(|e| ConnectError::other(e.to_string()))?;
361 send_frame(
362 &mut stream,
363 &plain.pack(&req_retry).to_plaintext_bytes(),
364 &frame_kind,
365 )
366 .await?;
367 let ans_retry: tl::enums::SetClientDhParamsAnswer =
368 recv_frame_plain(&mut stream, &frame_kind).await?;
369 result = auth::finish(s3_retry, ans_retry)
370 .map_err(|e| ConnectError::other(e.to_string()))?;
371 }
372 }
373 }
374 };
375 tracing::debug!("[ferogram::connect] DH handshake complete, auth key established");
376
377 Ok::<Self, ConnectError>(Self {
378 stream,
379 enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
380 frame_kind,
381 perm_auth_key: None, })
383 };
384
385 tokio::time::timeout(Duration::from_secs(15), fut)
386 .await
387 .map_err(|_| {
388 ConnectError::other(format!("DH handshake with {addr} timed out after 15 s"))
389 })?
390 }
391
392 #[allow(clippy::too_many_arguments)]
397 pub async fn connect_with_key(
398 addr: &str,
399 auth_key: [u8; 256],
400 first_salt: i64,
401 time_offset: i32,
402 socks5: Option<&crate::socks5::Socks5Config>,
403 mtproxy: Option<&crate::proxy::MtProxyConfig>,
404 transport: &TransportKind,
405 dc_id: i16,
406 pfs: bool,
407 ) -> Result<Self, ConnectError> {
408 let addr2 = addr.to_string();
409 let socks5_c = socks5.cloned();
410 let mtproxy_c = mtproxy.cloned();
411 let transport_c = transport.clone();
412
413 let fut = async move {
414 let (mut stream, frame_kind) = if let Some(ref mp) = mtproxy_c {
415 Self::open_stream_mtproxy(mp, dc_id).await?
416 } else {
417 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c, dc_id).await?
418 };
419 if pfs {
420 tracing::debug!("[ferogram::connect] PFS: binding temporary key for DC{dc_id}");
421 match Self::do_pfs_bind(&mut stream, &frame_kind, &auth_key, dc_id).await {
422 Ok(temp_enc) => {
423 tracing::debug!(
424 "[ferogram::connect] PFS: temporary key bound for DC{dc_id}"
425 );
426 return Ok(Self {
427 stream,
428 enc: temp_enc,
429 frame_kind,
430 perm_auth_key: Some(auth_key),
431 });
432 }
433 Err(e) => {
434 tracing::warn!(
435 "[ferogram::connect] PFS bind failed for DC{dc_id} ({e}); falling back to permanent key"
436 );
437 return Err(e);
440 }
441 }
442 }
443 Ok::<Self, ConnectError>(Self {
444 stream,
445 enc: EncryptedSession::new(auth_key, first_salt, time_offset),
446 frame_kind,
447 perm_auth_key: None,
448 })
449 };
450
451 tokio::time::timeout(Duration::from_secs(30), fut)
452 .await
453 .map_err(|_| {
454 ConnectError::other(format!("connect_with_key to {addr} timed out after 30 s"))
455 })?
456 }
457
458 async fn do_pfs_bind(
462 stream: &mut TcpStream,
463 frame_kind: &FrameKind,
464 perm_auth_key: &[u8; 256],
465 dc_id: i16,
466 ) -> Result<EncryptedSession, ConnectError> {
467 use ferogram_mtproto::{
468 auth_key_id_from_key, encrypt_bind_inner, gen_msg_id, new_seen_msg_ids,
469 serialize_bind_temp_auth_key,
470 };
471 const TEMP_EXPIRES: i32 = 86_400; let mut plain = Session::new();
475
476 let (req1, s1) = auth::step1().map_err(|e| ConnectError::other(e.to_string()))?;
477 send_frame(stream, &plain.pack(&req1).to_plaintext_bytes(), frame_kind).await?;
478 let res_pq: tl::enums::ResPq = recv_frame_plain(stream, frame_kind).await?;
479
480 let (req2, s2) = ferogram_mtproto::step2_temp(s1, res_pq, dc_id as i32, TEMP_EXPIRES)
481 .map_err(|e| ConnectError::other(e.to_string()))?;
482 send_frame(stream, &plain.pack(&req2).to_plaintext_bytes(), frame_kind).await?;
483 let dh: tl::enums::ServerDhParams = recv_frame_plain(stream, frame_kind).await?;
484
485 let (req3, s3) = auth::step3(s2, dh).map_err(|e| ConnectError::other(e.to_string()))?;
486 send_frame(stream, &plain.pack(&req3).to_plaintext_bytes(), frame_kind).await?;
487 let ans: tl::enums::SetClientDhParamsAnswer = recv_frame_plain(stream, frame_kind).await?;
488
489 let done = {
490 let mut result =
491 auth::finish(s3, ans).map_err(|e| ConnectError::other(e.to_string()))?;
492 let mut attempts = 0u8;
493 loop {
494 match result {
495 ferogram_mtproto::FinishResult::Done(d) => break d,
496 ferogram_mtproto::FinishResult::Retry {
497 retry_id,
498 dh_params,
499 nonce,
500 server_nonce,
501 new_nonce,
502 } => {
503 attempts += 1;
504 if attempts >= 5 {
505 return Err(ConnectError::other(
506 "PFS temp DH retry exceeded 5 attempts",
507 ));
508 }
509 let (rr, s3r) = ferogram_mtproto::retry_step3(
510 &dh_params,
511 nonce,
512 server_nonce,
513 new_nonce,
514 retry_id,
515 )
516 .map_err(|e| ConnectError::other(e.to_string()))?;
517 send_frame(stream, &plain.pack(&rr).to_plaintext_bytes(), frame_kind)
518 .await?;
519 let ar: tl::enums::SetClientDhParamsAnswer =
520 recv_frame_plain(stream, frame_kind).await?;
521 result = auth::finish(s3r, ar)
522 .map_err(|e| ConnectError::other(e.to_string()))?;
523 }
524 }
525 }
526 };
527
528 let temp_key = done.auth_key;
529 let temp_salt = done.first_salt;
530 let temp_offset = done.time_offset;
531
532 let temp_key_id = auth_key_id_from_key(&temp_key);
534 let perm_key_id = auth_key_id_from_key(perm_auth_key);
535
536 let mut nonce_buf = [0u8; 8];
537 ferogram_crypto::fill_random(&mut nonce_buf);
538 let nonce = i64::from_le_bytes(nonce_buf);
539
540 let server_now = std::time::SystemTime::now()
541 .duration_since(std::time::UNIX_EPOCH)
542 .unwrap()
543 .as_secs() as i32
544 + temp_offset;
545 let expires_at = server_now + TEMP_EXPIRES;
546
547 let seen = new_seen_msg_ids();
548 let mut temp_enc = EncryptedSession::with_seen(temp_key, temp_salt, temp_offset, seen);
549 let temp_session_id = temp_enc.session_id();
550
551 let msg_id = gen_msg_id();
552 let enc_msg = encrypt_bind_inner(
553 perm_auth_key,
554 msg_id,
555 nonce,
556 temp_key_id,
557 perm_key_id,
558 temp_session_id,
559 expires_at,
560 );
561 let bind_body = serialize_bind_temp_auth_key(perm_key_id, nonce, expires_at, &enc_msg);
562
563 let wire = temp_enc.pack_body_at_msg_id(&bind_body, msg_id);
565 send_frame(stream, &wire, frame_kind).await?;
566
567 for attempt in 0u8..5 {
571 let mut raw = recv_raw_frame(stream, frame_kind).await?;
572 let decrypted = temp_enc
573 .unpack(&mut raw)
574 .map_err(|e| ConnectError::other(format!("PFS bind decrypt: {e:?}")))?;
575 match decode_bind_response(&decrypted.body) {
576 Ok(()) => {
577 return Ok(temp_enc);
581 }
582 Err(ref e) if e == "__need_more__" => {
583 tracing::debug!(
584 "[ferogram::connect] PFS (DC{dc_id}): got informational frame on attempt {attempt}, reading next"
585 );
586 continue;
587 }
588 Err(reason) => {
589 tracing::error!(
590 "[ferogram::connect] PFS bind rejected by server for DC{dc_id}: {reason}"
591 );
592 return Err(ConnectError::other(format!(
593 "auth.bindTempAuthKey: {reason}"
594 )));
595 }
596 }
597 }
598 Err(ConnectError::other(
599 "auth.bindTempAuthKey: no boolTrue after 5 frames",
600 ))
601 }
602
603 pub fn auth_key_bytes(&self) -> [u8; 256] {
607 self.perm_auth_key
610 .unwrap_or_else(|| self.enc.auth_key_bytes())
611 }
612
613 pub async fn connect_to_dc(
619 addr: &str,
620 dc_id: i16,
621 transport: &TransportKind,
622 socks5: Option<&crate::socks5::Socks5Config>,
623 mtproxy: Option<&crate::proxy::MtProxyConfig>,
624 ) -> Result<(TcpStream, FrameKind, EncryptedSession), ConnectError> {
625 let conn = Self::connect_raw(addr, socks5, mtproxy, transport, dc_id).await?;
626 Ok((conn.stream, conn.frame_kind, conn.enc))
627 }
628}
629
630pub async fn connect_to_dc(
635 addr: &str,
636 dc_id: i16,
637 transport: &TransportKind,
638 socks5: Option<&crate::socks5::Socks5Config>,
639 mtproxy: Option<&crate::proxy::MtProxyConfig>,
640) -> Result<(TcpStream, FrameKind, EncryptedSession), ConnectError> {
641 Connection::connect_to_dc(addr, dc_id, transport, socks5, mtproxy).await
642}