1use std::sync::Arc;
14use std::time::Duration;
15
16use socket2::TcpKeepalive;
17use tokio::io::AsyncWriteExt;
18use tokio::net::TcpStream;
19
20use ferogram_mtproto::{EncryptedSession, Session, authentication as auth};
21use ferogram_tl_types as tl;
22
23use crate::error::ConnectError;
24use crate::frame::{recv_frame_plain, send_frame};
25use crate::pfs::decode_bind_response;
26use crate::transport::recv_raw_frame;
27use crate::transport_kind::TransportKind;
28
29pub const PING_DELAY_SECS: u64 = 60;
30pub const NO_PING_DISCONNECT: i32 = 75;
31
32const TCP_KEEPALIVE_IDLE_SECS: u64 = 10;
33const TCP_KEEPALIVE_INTERVAL_SECS: u64 = 5;
34#[cfg(not(target_os = "windows"))]
35const TCP_KEEPALIVE_PROBES: u32 = 3;
36
37#[derive(Clone)]
45pub enum FrameKind {
46 Abridged,
47 Intermediate,
48 Full {
49 send_seqno: Arc<std::sync::atomic::AtomicU32>,
50 recv_seqno: Arc<std::sync::atomic::AtomicU32>,
51 },
52 Obfuscated {
54 cipher: std::sync::Arc<tokio::sync::Mutex<ferogram_crypto::ObfuscatedCipher>>,
55 },
56 PaddedIntermediate {
58 cipher: std::sync::Arc<tokio::sync::Mutex<ferogram_crypto::ObfuscatedCipher>>,
59 },
60 FakeTls {
62 cipher: std::sync::Arc<tokio::sync::Mutex<ferogram_crypto::ObfuscatedCipher>>,
63 },
64}
65
66#[derive(Clone, Debug)]
69pub struct FutureSalt {
70 pub valid_since: i32,
71 pub valid_until: u32,
76 pub salt: i64,
77}
78
79pub const SALT_USE_DELAY: i32 = 60;
82
83pub struct Connection {
84 pub stream: TcpStream,
85 pub enc: EncryptedSession,
86 pub frame_kind: FrameKind,
87 pub perm_auth_key: Option<[u8; 256]>,
91}
92
93impl Connection {
94 async fn open_stream(
96 addr: &str,
97 socks5: Option<&crate::socks5::Socks5Config>,
98 transport: &TransportKind,
99 dc_id: i16,
100 ) -> Result<(TcpStream, FrameKind), ConnectError> {
101 let stream = match socks5 {
102 Some(proxy) => proxy.connect(addr).await?,
103 None => {
104 let stream = TcpStream::connect(addr).await.map_err(ConnectError::Io)?;
105 stream.set_nodelay(true).ok();
106 {
107 let sock = socket2::SockRef::from(&stream);
108 let keepalive = TcpKeepalive::new()
109 .with_time(Duration::from_secs(TCP_KEEPALIVE_IDLE_SECS))
110 .with_interval(Duration::from_secs(TCP_KEEPALIVE_INTERVAL_SECS));
111 #[cfg(not(target_os = "windows"))]
112 let keepalive = keepalive.with_retries(TCP_KEEPALIVE_PROBES);
113 sock.set_tcp_keepalive(&keepalive).ok();
114 }
115 stream
116 }
117 };
118 Self::apply_transport_init(stream, transport, dc_id).await
119 }
120
121 async fn open_stream_mtproxy(
124 mtproxy: &crate::proxy::MtProxyConfig,
125 dc_id: i16,
126 ) -> Result<(TcpStream, FrameKind), ConnectError> {
127 let stream = mtproxy.connect().await?;
128 stream.set_nodelay(true).ok();
129 Self::apply_transport_init(stream, &mtproxy.transport, dc_id).await
130 }
131
132 async fn apply_transport_init(
133 mut stream: TcpStream,
134 transport: &TransportKind,
135 dc_id: i16,
136 ) -> Result<(TcpStream, FrameKind), ConnectError> {
137 match transport {
138 TransportKind::Abridged => {
139 stream.write_all(&[0xef]).await?;
140 Ok((stream, FrameKind::Abridged))
141 }
142 TransportKind::Intermediate => {
143 stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
144 Ok((stream, FrameKind::Intermediate))
145 }
146 TransportKind::Full => {
147 Ok((
149 stream,
150 FrameKind::Full {
151 send_seqno: Arc::new(std::sync::atomic::AtomicU32::new(0)),
152 recv_seqno: Arc::new(std::sync::atomic::AtomicU32::new(0)),
153 },
154 ))
155 }
156 TransportKind::Obfuscated { secret } => {
157 let proxy_secret = secret.as_ref().map(|s| s.as_ref());
158 let (nonce, cipher) =
159 ferogram_crypto::build_obfuscated_init(0xef, dc_id, proxy_secret);
160 stream.write_all(&nonce).await?;
161 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
162 Ok((stream, FrameKind::Obfuscated { cipher: cipher_arc }))
163 }
164 TransportKind::PaddedIntermediate { secret } => {
165 let proxy_secret = secret.as_ref().map(|s| s.as_ref());
166 let (nonce, cipher) =
167 ferogram_crypto::build_obfuscated_init(0xdd, dc_id, proxy_secret);
168 stream.write_all(&nonce).await?;
169 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
170 Ok((stream, FrameKind::PaddedIntermediate { cipher: cipher_arc }))
171 }
172 TransportKind::FakeTls { secret, domain } => {
173 let domain_bytes = domain.as_bytes();
177 let mut session_id = [0u8; 32];
178 ferogram_crypto::fill_random(&mut session_id);
179
180 let cipher_suites: &[u8] = &[0x00, 0x04, 0x13, 0x01, 0x13, 0x02];
182 let compression: &[u8] = &[0x01, 0x00];
183 let sni_name_len = domain_bytes.len() as u16;
184 let sni_list_len = sni_name_len + 3;
185 let sni_ext_len = sni_list_len + 2;
186 let mut sni_ext = Vec::new();
187 sni_ext.extend_from_slice(&[0x00, 0x00]);
188 sni_ext.extend_from_slice(&sni_ext_len.to_be_bytes());
189 sni_ext.extend_from_slice(&sni_list_len.to_be_bytes());
190 sni_ext.push(0x00);
191 sni_ext.extend_from_slice(&sni_name_len.to_be_bytes());
192 sni_ext.extend_from_slice(domain_bytes);
193 let sup_ver: &[u8] = &[0x00, 0x2b, 0x00, 0x03, 0x02, 0x03, 0x04];
194 let sup_grp: &[u8] = &[0x00, 0x0a, 0x00, 0x04, 0x00, 0x02, 0x00, 0x1d];
195 let sess_tick: &[u8] = &[0x00, 0x23, 0x00, 0x00];
196 let ext_body_len = sni_ext.len() + sup_ver.len() + sup_grp.len() + sess_tick.len();
197 let mut extensions = Vec::new();
198 extensions.extend_from_slice(&(ext_body_len as u16).to_be_bytes());
199 extensions.extend_from_slice(&sni_ext);
200 extensions.extend_from_slice(sup_ver);
201 extensions.extend_from_slice(sup_grp);
202 extensions.extend_from_slice(sess_tick);
203
204 let mut hello_body = Vec::new();
205 hello_body.extend_from_slice(&[0x03, 0x03]);
206 hello_body.extend_from_slice(&[0u8; 32]); hello_body.push(session_id.len() as u8);
208 hello_body.extend_from_slice(&session_id);
209 hello_body.extend_from_slice(cipher_suites);
210 hello_body.extend_from_slice(compression);
211 hello_body.extend_from_slice(&extensions);
212
213 let hs_len = hello_body.len() as u32;
214 let mut handshake = vec![
215 0x01,
216 ((hs_len >> 16) & 0xff) as u8,
217 ((hs_len >> 8) & 0xff) as u8,
218 (hs_len & 0xff) as u8,
219 ];
220 handshake.extend_from_slice(&hello_body);
221
222 let rec_len = handshake.len() as u16;
223 let mut record = Vec::new();
224 record.push(0x16);
225 record.extend_from_slice(&[0x03, 0x01]);
226 record.extend_from_slice(&rec_len.to_be_bytes());
227 record.extend_from_slice(&handshake);
228
229 let random_offset = 5 + 4 + 2; let (hmac_result, cipher) = ferogram_crypto::build_fake_tls_keys(secret, &record);
235 record[random_offset..random_offset + 32].copy_from_slice(&hmac_result);
236 stream.write_all(&record).await?;
237
238 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
239 Ok((stream, FrameKind::FakeTls { cipher: cipher_arc }))
240 }
241 TransportKind::Http => {
242 stream.write_all(&[0xef]).await?;
244 Ok((stream, FrameKind::Abridged))
245 }
246 }
247 }
248
249 pub async fn open_stream_pub(
254 addr: &str,
255 dc_id: i16,
256 transport: &TransportKind,
257 socks5: Option<&crate::socks5::Socks5Config>,
258 mtproxy: Option<&crate::proxy::MtProxyConfig>,
259 ) -> Result<(TcpStream, FrameKind), ConnectError> {
260 if let Some(mp) = mtproxy {
261 Self::open_stream_mtproxy(mp, dc_id).await
262 } else {
263 Self::open_stream(addr, socks5, transport, dc_id).await
264 }
265 }
266
267 pub async fn connect_raw(
268 addr: &str,
269 socks5: Option<&crate::socks5::Socks5Config>,
270 mtproxy: Option<&crate::proxy::MtProxyConfig>,
271 transport: &TransportKind,
272 dc_id: i16,
273 ) -> Result<Self, ConnectError> {
274 let t_label = match transport {
275 TransportKind::Abridged => "Abridged",
276 TransportKind::Obfuscated { .. } => "Obfuscated",
277 TransportKind::PaddedIntermediate { .. } => "PaddedIntermediate",
278 TransportKind::Http => "Http",
279 TransportKind::Intermediate => "Intermediate",
280 TransportKind::Full => "Full",
281 TransportKind::FakeTls { .. } => "FakeTls",
282 };
283 tracing::debug!("[ferogram::connect] starting DH handshake with {addr} via {t_label}");
284
285 let addr2 = addr.to_string();
286 let socks5_c = socks5.cloned();
287 let mtproxy_c = mtproxy.cloned();
288 let transport_c = transport.clone();
289
290 let fut = async move {
291 let (mut stream, frame_kind) = if let Some(ref mp) = mtproxy_c {
292 Self::open_stream_mtproxy(mp, dc_id).await?
293 } else {
294 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c, dc_id).await?
295 };
296
297 let mut plain = Session::new();
298
299 let (req1, s1) = auth::step1().map_err(|e| ConnectError::other(e.to_string()))?;
300 send_frame(
301 &mut stream,
302 &plain.pack(&req1).to_plaintext_bytes(),
303 &frame_kind,
304 )
305 .await?;
306 let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
307
308 let (req2, s2) = auth::step2(s1, res_pq, dc_id as i32)
309 .map_err(|e| ConnectError::other(e.to_string()))?;
310 send_frame(
311 &mut stream,
312 &plain.pack(&req2).to_plaintext_bytes(),
313 &frame_kind,
314 )
315 .await?;
316 let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
317
318 let (req3, s3) = auth::step3(s2, dh).map_err(|e| ConnectError::other(e.to_string()))?;
319 send_frame(
320 &mut stream,
321 &plain.pack(&req3).to_plaintext_bytes(),
322 &frame_kind,
323 )
324 .await?;
325 let ans: tl::enums::SetClientDhParamsAnswer =
326 recv_frame_plain(&mut stream, &frame_kind).await?;
327
328 let done = {
330 let mut result =
331 auth::finish(s3, ans).map_err(|e| ConnectError::other(e.to_string()))?;
332 let mut attempts = 0u8;
333 loop {
334 match result {
335 auth::FinishResult::Done(d) => break d,
336 auth::FinishResult::Retry {
337 retry_id,
338 dh_params,
339 nonce,
340 server_nonce,
341 new_nonce,
342 } => {
343 attempts += 1;
344 if attempts >= 5 {
345 return Err(ConnectError::other(
346 "dh_gen_retry exceeded 5 attempts",
347 ));
348 }
349 let (req_retry, s3_retry) = auth::retry_step3(
350 &dh_params,
351 nonce,
352 server_nonce,
353 new_nonce,
354 retry_id,
355 )
356 .map_err(|e| ConnectError::other(e.to_string()))?;
357 send_frame(
358 &mut stream,
359 &plain.pack(&req_retry).to_plaintext_bytes(),
360 &frame_kind,
361 )
362 .await?;
363 let ans_retry: tl::enums::SetClientDhParamsAnswer =
364 recv_frame_plain(&mut stream, &frame_kind).await?;
365 result = auth::finish(s3_retry, ans_retry)
366 .map_err(|e| ConnectError::other(e.to_string()))?;
367 }
368 }
369 }
370 };
371 tracing::debug!("[ferogram::connect] DH handshake complete, auth key established");
372
373 Ok::<Self, ConnectError>(Self {
374 stream,
375 enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
376 frame_kind,
377 perm_auth_key: None, })
379 };
380
381 tokio::time::timeout(Duration::from_secs(15), fut)
382 .await
383 .map_err(|_| {
384 ConnectError::other(format!("DH handshake with {addr} timed out after 15 s"))
385 })?
386 }
387
388 #[allow(clippy::too_many_arguments)]
389 pub async fn connect_with_key(
390 addr: &str,
391 auth_key: [u8; 256],
392 first_salt: i64,
393 time_offset: i32,
394 socks5: Option<&crate::socks5::Socks5Config>,
395 mtproxy: Option<&crate::proxy::MtProxyConfig>,
396 transport: &TransportKind,
397 dc_id: i16,
398 pfs: bool,
399 ) -> Result<Self, ConnectError> {
400 let addr2 = addr.to_string();
401 let socks5_c = socks5.cloned();
402 let mtproxy_c = mtproxy.cloned();
403 let transport_c = transport.clone();
404
405 let fut = async move {
406 let (mut stream, frame_kind) = if let Some(ref mp) = mtproxy_c {
407 Self::open_stream_mtproxy(mp, dc_id).await?
408 } else {
409 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c, dc_id).await?
410 };
411 if pfs {
412 tracing::debug!("[ferogram::connect] PFS: binding temporary key for DC{dc_id}");
413 match Self::do_pfs_bind(&mut stream, &frame_kind, &auth_key, dc_id).await {
414 Ok(temp_enc) => {
415 tracing::debug!(
416 "[ferogram::connect] PFS: temporary key bound for DC{dc_id}"
417 );
418 return Ok(Self {
419 stream,
420 enc: temp_enc,
421 frame_kind,
422 perm_auth_key: Some(auth_key),
423 });
424 }
425 Err(e) => {
426 tracing::warn!(
427 "[ferogram::connect] PFS bind failed for DC{dc_id} ({e}); falling back to permanent key"
428 );
429 return Err(e);
432 }
433 }
434 }
435 Ok::<Self, ConnectError>(Self {
436 stream,
437 enc: EncryptedSession::new(auth_key, first_salt, time_offset),
438 frame_kind,
439 perm_auth_key: None,
440 })
441 };
442
443 tokio::time::timeout(Duration::from_secs(30), fut)
444 .await
445 .map_err(|_| {
446 ConnectError::other(format!("connect_with_key to {addr} timed out after 30 s"))
447 })?
448 }
449
450 async fn do_pfs_bind(
454 stream: &mut TcpStream,
455 frame_kind: &FrameKind,
456 perm_auth_key: &[u8; 256],
457 dc_id: i16,
458 ) -> Result<EncryptedSession, ConnectError> {
459 use ferogram_mtproto::{
460 auth_key_id_from_key, encrypt_bind_inner, gen_msg_id, new_seen_msg_ids,
461 serialize_bind_temp_auth_key,
462 };
463 const TEMP_EXPIRES: i32 = 86_400; let mut plain = Session::new();
467
468 let (req1, s1) = auth::step1().map_err(|e| ConnectError::other(e.to_string()))?;
469 send_frame(stream, &plain.pack(&req1).to_plaintext_bytes(), frame_kind).await?;
470 let res_pq: tl::enums::ResPq = recv_frame_plain(stream, frame_kind).await?;
471
472 let (req2, s2) = ferogram_mtproto::step2_temp(s1, res_pq, dc_id as i32, TEMP_EXPIRES)
473 .map_err(|e| ConnectError::other(e.to_string()))?;
474 send_frame(stream, &plain.pack(&req2).to_plaintext_bytes(), frame_kind).await?;
475 let dh: tl::enums::ServerDhParams = recv_frame_plain(stream, frame_kind).await?;
476
477 let (req3, s3) = auth::step3(s2, dh).map_err(|e| ConnectError::other(e.to_string()))?;
478 send_frame(stream, &plain.pack(&req3).to_plaintext_bytes(), frame_kind).await?;
479 let ans: tl::enums::SetClientDhParamsAnswer = recv_frame_plain(stream, frame_kind).await?;
480
481 let done = {
482 let mut result =
483 auth::finish(s3, ans).map_err(|e| ConnectError::other(e.to_string()))?;
484 let mut attempts = 0u8;
485 loop {
486 match result {
487 ferogram_mtproto::FinishResult::Done(d) => break d,
488 ferogram_mtproto::FinishResult::Retry {
489 retry_id,
490 dh_params,
491 nonce,
492 server_nonce,
493 new_nonce,
494 } => {
495 attempts += 1;
496 if attempts >= 5 {
497 return Err(ConnectError::other(
498 "PFS temp DH retry exceeded 5 attempts",
499 ));
500 }
501 let (rr, s3r) = ferogram_mtproto::retry_step3(
502 &dh_params,
503 nonce,
504 server_nonce,
505 new_nonce,
506 retry_id,
507 )
508 .map_err(|e| ConnectError::other(e.to_string()))?;
509 send_frame(stream, &plain.pack(&rr).to_plaintext_bytes(), frame_kind)
510 .await?;
511 let ar: tl::enums::SetClientDhParamsAnswer =
512 recv_frame_plain(stream, frame_kind).await?;
513 result = auth::finish(s3r, ar)
514 .map_err(|e| ConnectError::other(e.to_string()))?;
515 }
516 }
517 }
518 };
519
520 let temp_key = done.auth_key;
521 let temp_salt = done.first_salt;
522 let temp_offset = done.time_offset;
523
524 let temp_key_id = auth_key_id_from_key(&temp_key);
526 let perm_key_id = auth_key_id_from_key(perm_auth_key);
527
528 let mut nonce_buf = [0u8; 8];
529 ferogram_crypto::fill_random(&mut nonce_buf);
530 let nonce = i64::from_le_bytes(nonce_buf);
531
532 let server_now = std::time::SystemTime::now()
533 .duration_since(std::time::UNIX_EPOCH)
534 .unwrap()
535 .as_secs() as i32
536 + temp_offset;
537 let expires_at = server_now + TEMP_EXPIRES;
538
539 let seen = new_seen_msg_ids();
540 let mut temp_enc = EncryptedSession::with_seen(temp_key, temp_salt, temp_offset, seen);
541 let temp_session_id = temp_enc.session_id();
542
543 let msg_id = gen_msg_id();
544 let enc_msg = encrypt_bind_inner(
545 perm_auth_key,
546 msg_id,
547 nonce,
548 temp_key_id,
549 perm_key_id,
550 temp_session_id,
551 expires_at,
552 );
553 let bind_body = serialize_bind_temp_auth_key(perm_key_id, nonce, expires_at, &enc_msg);
554
555 let wire = temp_enc.pack_body_at_msg_id(&bind_body, msg_id);
557 send_frame(stream, &wire, frame_kind).await?;
558
559 for attempt in 0u8..5 {
563 let mut raw = recv_raw_frame(stream, frame_kind).await?;
564 let decrypted = temp_enc
565 .unpack(&mut raw)
566 .map_err(|e| ConnectError::other(format!("PFS bind decrypt: {e:?}")))?;
567 match decode_bind_response(&decrypted.body) {
568 Ok(()) => {
569 return Ok(temp_enc);
573 }
574 Err(ref e) if e == "__need_more__" => {
575 tracing::debug!(
576 "[ferogram::connect] PFS (DC{dc_id}): got informational frame on attempt {attempt}, reading next"
577 );
578 continue;
579 }
580 Err(reason) => {
581 tracing::error!(
582 "[ferogram::connect] PFS bind rejected by server for DC{dc_id}: {reason}"
583 );
584 return Err(ConnectError::other(format!(
585 "auth.bindTempAuthKey: {reason}"
586 )));
587 }
588 }
589 }
590 Err(ConnectError::other(
591 "auth.bindTempAuthKey: no boolTrue after 5 frames",
592 ))
593 }
594
595 pub fn auth_key_bytes(&self) -> [u8; 256] {
596 self.perm_auth_key
599 .unwrap_or_else(|| self.enc.auth_key_bytes())
600 }
601
602 pub async fn connect_to_dc(
608 addr: &str,
609 dc_id: i16,
610 transport: &TransportKind,
611 socks5: Option<&crate::socks5::Socks5Config>,
612 mtproxy: Option<&crate::proxy::MtProxyConfig>,
613 ) -> Result<(TcpStream, FrameKind, EncryptedSession), ConnectError> {
614 let conn = Self::connect_raw(addr, socks5, mtproxy, transport, dc_id).await?;
615 Ok((conn.stream, conn.frame_kind, conn.enc))
616 }
617}
618
619pub async fn connect_to_dc(
624 addr: &str,
625 dc_id: i16,
626 transport: &TransportKind,
627 socks5: Option<&crate::socks5::Socks5Config>,
628 mtproxy: Option<&crate::proxy::MtProxyConfig>,
629) -> Result<(TcpStream, FrameKind, EncryptedSession), ConnectError> {
630 Connection::connect_to_dc(addr, dc_id, transport, socks5, mtproxy).await
631}