Skip to main content

ferogram_connect/
connection.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2//
3// ferogram: async Telegram MTProto client in Rust
4// https://github.com/ankit-chaubey/ferogram
5//
6// Licensed under either the MIT License or the Apache License 2.0.
7// See the LICENSE-MIT or LICENSE-APACHE file in this repository:
8// https://github.com/ankit-chaubey/ferogram
9//
10// Feel free to use, modify, and share this code.
11// Please keep this notice when redistributing.
12
13use std::sync::Arc;
14use std::time::Duration;
15
16use socket2::TcpKeepalive;
17use tokio::io::AsyncWriteExt;
18use tokio::net::TcpStream;
19
20use ferogram_mtproto::{EncryptedSession, Session, authentication as auth};
21use ferogram_tl_types as tl;
22
23use crate::error::ConnectError;
24use crate::frame::{recv_frame_plain, send_frame};
25use crate::pfs::decode_bind_response;
26use crate::transport::recv_raw_frame;
27use crate::transport_kind::TransportKind;
28
29pub const PING_DELAY_SECS: u64 = 60;
30pub const NO_PING_DISCONNECT: i32 = 75;
31
32const TCP_KEEPALIVE_IDLE_SECS: u64 = 10;
33const TCP_KEEPALIVE_INTERVAL_SECS: u64 = 5;
34#[cfg(not(target_os = "windows"))]
35const TCP_KEEPALIVE_PROBES: u32 = 3;
36
37/// How framing bytes are sent/received on a connection.
38///
39/// `Obfuscated` carries an `Arc<Mutex<ObfuscatedCipher>>` so the same cipher
40/// state is shared (safely) between the writer task (TX / `encrypt`) and the
41/// reader task (RX / `decrypt`).  The two directions are separate AES-CTR
42/// instances inside `ObfuscatedCipher`, so locking is only needed to prevent
43/// concurrent mutation of the struct, not to serialise TX vs RX.
44#[derive(Clone)]
45pub enum FrameKind {
46    Abridged,
47    Intermediate,
48    Full {
49        send_seqno: Arc<std::sync::atomic::AtomicU32>,
50        recv_seqno: Arc<std::sync::atomic::AtomicU32>,
51    },
52    /// Obfuscated2 over Abridged framing.
53    Obfuscated {
54        cipher: std::sync::Arc<tokio::sync::Mutex<ferogram_crypto::ObfuscatedCipher>>,
55    },
56    /// Obfuscated2 over Intermediate+padding framing (`0xDD` MTProxy).
57    PaddedIntermediate {
58        cipher: std::sync::Arc<tokio::sync::Mutex<ferogram_crypto::ObfuscatedCipher>>,
59    },
60    /// FakeTLS framing (`0xEE` MTProxy).
61    FakeTls {
62        cipher: std::sync::Arc<tokio::sync::Mutex<ferogram_crypto::ObfuscatedCipher>>,
63    },
64}
65
66/// A single server-provided salt with its validity window.
67///
68#[derive(Clone, Debug)]
69pub struct FutureSalt {
70    pub valid_since: i32,
71    /// Stored as `u32` because Telegram sends validity windows that extend
72    /// past 2038 (e.g. valid_until ≈ 2_751_656_413, year 2057).  Those values
73    /// overflow `i32` and wrap negative, making every salt look expired when
74    /// compared against the current server time with a signed comparison.
75    pub valid_until: u32,
76    pub salt: i64,
77}
78
79/// Delay (seconds) before a salt is considered usable after its `valid_since`.
80///
81pub const SALT_USE_DELAY: i32 = 60;
82
83pub struct Connection {
84    pub stream: TcpStream,
85    pub enc: EncryptedSession,
86    pub frame_kind: FrameKind,
87    /// When PFS is active, the permanent auth key (stored in session).
88    /// `enc` holds the temp key; this field holds the perm key so
89    /// `auth_key_bytes()` returns the right value to persist.
90    pub perm_auth_key: Option<[u8; 256]>,
91}
92
93impl Connection {
94    /// Open a TCP stream, optionally via SOCKS5, and apply transport init bytes.
95    async fn open_stream(
96        addr: &str,
97        socks5: Option<&crate::socks5::Socks5Config>,
98        transport: &TransportKind,
99        dc_id: i16,
100    ) -> Result<(TcpStream, FrameKind), ConnectError> {
101        let stream = match socks5 {
102            Some(proxy) => proxy.connect(addr).await?,
103            None => {
104                let stream = TcpStream::connect(addr).await.map_err(ConnectError::Io)?;
105                stream.set_nodelay(true).ok();
106                {
107                    let sock = socket2::SockRef::from(&stream);
108                    let keepalive = TcpKeepalive::new()
109                        .with_time(Duration::from_secs(TCP_KEEPALIVE_IDLE_SECS))
110                        .with_interval(Duration::from_secs(TCP_KEEPALIVE_INTERVAL_SECS));
111                    #[cfg(not(target_os = "windows"))]
112                    let keepalive = keepalive.with_retries(TCP_KEEPALIVE_PROBES);
113                    sock.set_tcp_keepalive(&keepalive).ok();
114                }
115                stream
116            }
117        };
118        Self::apply_transport_init(stream, transport, dc_id).await
119    }
120
121    /// Open a stream routed through an MTProxy (connects to proxy host:port,
122    /// not to the Telegram DC address).
123    async fn open_stream_mtproxy(
124        mtproxy: &crate::proxy::MtProxyConfig,
125        dc_id: i16,
126    ) -> Result<(TcpStream, FrameKind), ConnectError> {
127        let stream = mtproxy.connect().await?;
128        stream.set_nodelay(true).ok();
129        Self::apply_transport_init(stream, &mtproxy.transport, dc_id).await
130    }
131
132    async fn apply_transport_init(
133        mut stream: TcpStream,
134        transport: &TransportKind,
135        dc_id: i16,
136    ) -> Result<(TcpStream, FrameKind), ConnectError> {
137        match transport {
138            TransportKind::Abridged => {
139                stream.write_all(&[0xef]).await?;
140                Ok((stream, FrameKind::Abridged))
141            }
142            TransportKind::Intermediate => {
143                stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
144                Ok((stream, FrameKind::Intermediate))
145            }
146            TransportKind::Full => {
147                // Full transport has no init byte.
148                Ok((
149                    stream,
150                    FrameKind::Full {
151                        send_seqno: Arc::new(std::sync::atomic::AtomicU32::new(0)),
152                        recv_seqno: Arc::new(std::sync::atomic::AtomicU32::new(0)),
153                    },
154                ))
155            }
156            TransportKind::Obfuscated { secret } => {
157                let proxy_secret = secret.as_ref().map(|s| s.as_ref());
158                let (nonce, cipher) =
159                    ferogram_crypto::build_obfuscated_init(0xef, dc_id, proxy_secret);
160                stream.write_all(&nonce).await?;
161                let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
162                Ok((stream, FrameKind::Obfuscated { cipher: cipher_arc }))
163            }
164            TransportKind::PaddedIntermediate { secret } => {
165                let proxy_secret = secret.as_ref().map(|s| s.as_ref());
166                let (nonce, cipher) =
167                    ferogram_crypto::build_obfuscated_init(0xdd, dc_id, proxy_secret);
168                stream.write_all(&nonce).await?;
169                let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
170                Ok((stream, FrameKind::PaddedIntermediate { cipher: cipher_arc }))
171            }
172            TransportKind::FakeTls { secret, domain } => {
173                // Fake TLS 1.3 ClientHello with HMAC-SHA256 random field.
174                // After the handshake, data flows as TLS Application Data records
175                // over a shared Obfuscated2 cipher seeded from the secret+HMAC.
176                let domain_bytes = domain.as_bytes();
177                let mut session_id = [0u8; 32];
178                ferogram_crypto::fill_random(&mut session_id);
179
180                // Build ClientHello body (random placeholder = zeros)
181                let cipher_suites: &[u8] = &[0x00, 0x04, 0x13, 0x01, 0x13, 0x02];
182                let compression: &[u8] = &[0x01, 0x00];
183                let sni_name_len = domain_bytes.len() as u16;
184                let sni_list_len = sni_name_len + 3;
185                let sni_ext_len = sni_list_len + 2;
186                let mut sni_ext = Vec::new();
187                sni_ext.extend_from_slice(&[0x00, 0x00]);
188                sni_ext.extend_from_slice(&sni_ext_len.to_be_bytes());
189                sni_ext.extend_from_slice(&sni_list_len.to_be_bytes());
190                sni_ext.push(0x00);
191                sni_ext.extend_from_slice(&sni_name_len.to_be_bytes());
192                sni_ext.extend_from_slice(domain_bytes);
193                let sup_ver: &[u8] = &[0x00, 0x2b, 0x00, 0x03, 0x02, 0x03, 0x04];
194                let sup_grp: &[u8] = &[0x00, 0x0a, 0x00, 0x04, 0x00, 0x02, 0x00, 0x1d];
195                let sess_tick: &[u8] = &[0x00, 0x23, 0x00, 0x00];
196                let ext_body_len = sni_ext.len() + sup_ver.len() + sup_grp.len() + sess_tick.len();
197                let mut extensions = Vec::new();
198                extensions.extend_from_slice(&(ext_body_len as u16).to_be_bytes());
199                extensions.extend_from_slice(&sni_ext);
200                extensions.extend_from_slice(sup_ver);
201                extensions.extend_from_slice(sup_grp);
202                extensions.extend_from_slice(sess_tick);
203
204                let mut hello_body = Vec::new();
205                hello_body.extend_from_slice(&[0x03, 0x03]);
206                hello_body.extend_from_slice(&[0u8; 32]); // random placeholder
207                hello_body.push(session_id.len() as u8);
208                hello_body.extend_from_slice(&session_id);
209                hello_body.extend_from_slice(cipher_suites);
210                hello_body.extend_from_slice(compression);
211                hello_body.extend_from_slice(&extensions);
212
213                let hs_len = hello_body.len() as u32;
214                let mut handshake = vec![
215                    0x01,
216                    ((hs_len >> 16) & 0xff) as u8,
217                    ((hs_len >> 8) & 0xff) as u8,
218                    (hs_len & 0xff) as u8,
219                ];
220                handshake.extend_from_slice(&hello_body);
221
222                let rec_len = handshake.len() as u16;
223                let mut record = Vec::new();
224                record.push(0x16);
225                record.extend_from_slice(&[0x03, 0x01]);
226                record.extend_from_slice(&rec_len.to_be_bytes());
227                record.extend_from_slice(&handshake);
228
229                // Derive HMAC and obfuscation cipher via ferogram-crypto.
230                // build_fake_tls_keys returns (hmac_result, cipher):
231                //   hmac_result → written into ClientHello random field at offset 11
232                //   cipher      → AES-CTR pair for all subsequent I/O on this connection
233                let random_offset = 5 + 4 + 2; // TLS-rec(5) + HS-hdr(4) + version(2)
234                let (hmac_result, cipher) = ferogram_crypto::build_fake_tls_keys(secret, &record);
235                record[random_offset..random_offset + 32].copy_from_slice(&hmac_result);
236                stream.write_all(&record).await?;
237
238                let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
239                Ok((stream, FrameKind::FakeTls { cipher: cipher_arc }))
240            }
241            TransportKind::Http => {
242                // HTTP transport is handled in dc_pool - fall back to Abridged framing.
243                stream.write_all(&[0xef]).await?;
244                Ok((stream, FrameKind::Abridged))
245            }
246        }
247    }
248
249    /// Open a TCP stream and apply transport framing, returning the stream and FrameKind.
250    ///
251    /// Used by `ferogram-mtsender` for the connect-with-key path where DH is not needed
252    /// (the auth key is already known). Socket options and transport init are handled here.
253    pub async fn open_stream_pub(
254        addr: &str,
255        dc_id: i16,
256        transport: &TransportKind,
257        socks5: Option<&crate::socks5::Socks5Config>,
258        mtproxy: Option<&crate::proxy::MtProxyConfig>,
259    ) -> Result<(TcpStream, FrameKind), ConnectError> {
260        if let Some(mp) = mtproxy {
261            Self::open_stream_mtproxy(mp, dc_id).await
262        } else {
263            Self::open_stream(addr, socks5, transport, dc_id).await
264        }
265    }
266
267    /// Open a fresh connection and run the full unauthenticated DH key
268    /// exchange to produce a brand new permanent auth key. Use
269    /// [`Self::connect_with_key`] instead once an auth key already exists
270    /// for the DC, since redoing DH on every reconnect is wasted work.
271    pub async fn connect_raw(
272        addr: &str,
273        socks5: Option<&crate::socks5::Socks5Config>,
274        mtproxy: Option<&crate::proxy::MtProxyConfig>,
275        transport: &TransportKind,
276        dc_id: i16,
277    ) -> Result<Self, ConnectError> {
278        let t_label = match transport {
279            TransportKind::Abridged => "Abridged",
280            TransportKind::Obfuscated { .. } => "Obfuscated",
281            TransportKind::PaddedIntermediate { .. } => "PaddedIntermediate",
282            TransportKind::Http => "Http",
283            TransportKind::Intermediate => "Intermediate",
284            TransportKind::Full => "Full",
285            TransportKind::FakeTls { .. } => "FakeTls",
286        };
287        tracing::debug!("[ferogram::connect] starting DH handshake with {addr} via {t_label}");
288
289        let addr2 = addr.to_string();
290        let socks5_c = socks5.cloned();
291        let mtproxy_c = mtproxy.cloned();
292        let transport_c = transport.clone();
293
294        let fut = async move {
295            let (mut stream, frame_kind) = if let Some(ref mp) = mtproxy_c {
296                Self::open_stream_mtproxy(mp, dc_id).await?
297            } else {
298                Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c, dc_id).await?
299            };
300
301            let mut plain = Session::new();
302
303            let (req1, s1) = auth::step1().map_err(|e| ConnectError::other(e.to_string()))?;
304            send_frame(
305                &mut stream,
306                &plain.pack(&req1).to_plaintext_bytes(),
307                &frame_kind,
308            )
309            .await?;
310            let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
311
312            let (req2, s2) = auth::step2(s1, res_pq, dc_id as i32)
313                .map_err(|e| ConnectError::other(e.to_string()))?;
314            send_frame(
315                &mut stream,
316                &plain.pack(&req2).to_plaintext_bytes(),
317                &frame_kind,
318            )
319            .await?;
320            let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
321
322            let (req3, s3) = auth::step3(s2, dh).map_err(|e| ConnectError::other(e.to_string()))?;
323            send_frame(
324                &mut stream,
325                &plain.pack(&req3).to_plaintext_bytes(),
326                &frame_kind,
327            )
328            .await?;
329            let ans: tl::enums::SetClientDhParamsAnswer =
330                recv_frame_plain(&mut stream, &frame_kind).await?;
331
332            // Retry loop for dh_gen_retry (up to 5 attempts).
333            let done = {
334                let mut result =
335                    auth::finish(s3, ans).map_err(|e| ConnectError::other(e.to_string()))?;
336                let mut attempts = 0u8;
337                loop {
338                    match result {
339                        auth::FinishResult::Done(d) => break d,
340                        auth::FinishResult::Retry {
341                            retry_id,
342                            dh_params,
343                            nonce,
344                            server_nonce,
345                            new_nonce,
346                        } => {
347                            attempts += 1;
348                            if attempts >= 5 {
349                                return Err(ConnectError::other(
350                                    "dh_gen_retry exceeded 5 attempts",
351                                ));
352                            }
353                            let (req_retry, s3_retry) = auth::retry_step3(
354                                &dh_params,
355                                nonce,
356                                server_nonce,
357                                new_nonce,
358                                retry_id,
359                            )
360                            .map_err(|e| ConnectError::other(e.to_string()))?;
361                            send_frame(
362                                &mut stream,
363                                &plain.pack(&req_retry).to_plaintext_bytes(),
364                                &frame_kind,
365                            )
366                            .await?;
367                            let ans_retry: tl::enums::SetClientDhParamsAnswer =
368                                recv_frame_plain(&mut stream, &frame_kind).await?;
369                            result = auth::finish(s3_retry, ans_retry)
370                                .map_err(|e| ConnectError::other(e.to_string()))?;
371                        }
372                    }
373                }
374            };
375            tracing::debug!("[ferogram::connect] DH handshake complete, auth key established");
376
377            Ok::<Self, ConnectError>(Self {
378                stream,
379                enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
380                frame_kind,
381                perm_auth_key: None, // connect_raw produces the perm key itself
382            })
383        };
384
385        tokio::time::timeout(Duration::from_secs(15), fut)
386            .await
387            .map_err(|_| {
388                ConnectError::other(format!("DH handshake with {addr} timed out after 15 s"))
389            })?
390    }
391
392    /// Open a connection using an `auth_key` already negotiated for this DC,
393    /// skipping the DH handshake entirely. If `pfs` is set, also binds a
394    /// temporary key for this session via `auth.bindTempAuthKey`, falling
395    /// back to the permanent key if the bind fails.
396    #[allow(clippy::too_many_arguments)]
397    pub async fn connect_with_key(
398        addr: &str,
399        auth_key: [u8; 256],
400        first_salt: i64,
401        time_offset: i32,
402        socks5: Option<&crate::socks5::Socks5Config>,
403        mtproxy: Option<&crate::proxy::MtProxyConfig>,
404        transport: &TransportKind,
405        dc_id: i16,
406        pfs: bool,
407    ) -> Result<Self, ConnectError> {
408        let addr2 = addr.to_string();
409        let socks5_c = socks5.cloned();
410        let mtproxy_c = mtproxy.cloned();
411        let transport_c = transport.clone();
412
413        let fut = async move {
414            let (mut stream, frame_kind) = if let Some(ref mp) = mtproxy_c {
415                Self::open_stream_mtproxy(mp, dc_id).await?
416            } else {
417                Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c, dc_id).await?
418            };
419            if pfs {
420                tracing::debug!("[ferogram::connect] PFS: binding temporary key for DC{dc_id}");
421                match Self::do_pfs_bind(&mut stream, &frame_kind, &auth_key, dc_id).await {
422                    Ok(temp_enc) => {
423                        tracing::debug!(
424                            "[ferogram::connect] PFS: temporary key bound for DC{dc_id}"
425                        );
426                        return Ok(Self {
427                            stream,
428                            enc: temp_enc,
429                            frame_kind,
430                            perm_auth_key: Some(auth_key),
431                        });
432                    }
433                    Err(e) => {
434                        tracing::warn!(
435                            "[ferogram::connect] PFS bind failed for DC{dc_id} ({e}); falling back to permanent key"
436                        );
437                        // Graceful fallback: reconnect because DH frames left the stream dirty.
438                        // Return error and let the caller handle retry without PFS.
439                        return Err(e);
440                    }
441                }
442            }
443            Ok::<Self, ConnectError>(Self {
444                stream,
445                enc: EncryptedSession::new(auth_key, first_salt, time_offset),
446                frame_kind,
447                perm_auth_key: None,
448            })
449        };
450
451        tokio::time::timeout(Duration::from_secs(30), fut)
452            .await
453            .map_err(|_| {
454                ConnectError::other(format!("connect_with_key to {addr} timed out after 30 s"))
455            })?
456    }
457
458    /// Perform a fresh temp-key DH on an already-open stream, then
459    /// send `auth.bindTempAuthKey` encrypted with the temp key.
460    /// Returns an `EncryptedSession` keyed with the bound temp key.
461    async fn do_pfs_bind(
462        stream: &mut TcpStream,
463        frame_kind: &FrameKind,
464        perm_auth_key: &[u8; 256],
465        dc_id: i16,
466    ) -> Result<EncryptedSession, ConnectError> {
467        use ferogram_mtproto::{
468            auth_key_id_from_key, encrypt_bind_inner, gen_msg_id, new_seen_msg_ids,
469            serialize_bind_temp_auth_key,
470        };
471        const TEMP_EXPIRES: i32 = 86_400; // 24 h
472
473        // temp-key DH
474        let mut plain = Session::new();
475
476        let (req1, s1) = auth::step1().map_err(|e| ConnectError::other(e.to_string()))?;
477        send_frame(stream, &plain.pack(&req1).to_plaintext_bytes(), frame_kind).await?;
478        let res_pq: tl::enums::ResPq = recv_frame_plain(stream, frame_kind).await?;
479
480        let (req2, s2) = ferogram_mtproto::step2_temp(s1, res_pq, dc_id as i32, TEMP_EXPIRES)
481            .map_err(|e| ConnectError::other(e.to_string()))?;
482        send_frame(stream, &plain.pack(&req2).to_plaintext_bytes(), frame_kind).await?;
483        let dh: tl::enums::ServerDhParams = recv_frame_plain(stream, frame_kind).await?;
484
485        let (req3, s3) = auth::step3(s2, dh).map_err(|e| ConnectError::other(e.to_string()))?;
486        send_frame(stream, &plain.pack(&req3).to_plaintext_bytes(), frame_kind).await?;
487        let ans: tl::enums::SetClientDhParamsAnswer = recv_frame_plain(stream, frame_kind).await?;
488
489        let done = {
490            let mut result =
491                auth::finish(s3, ans).map_err(|e| ConnectError::other(e.to_string()))?;
492            let mut attempts = 0u8;
493            loop {
494                match result {
495                    ferogram_mtproto::FinishResult::Done(d) => break d,
496                    ferogram_mtproto::FinishResult::Retry {
497                        retry_id,
498                        dh_params,
499                        nonce,
500                        server_nonce,
501                        new_nonce,
502                    } => {
503                        attempts += 1;
504                        if attempts >= 5 {
505                            return Err(ConnectError::other(
506                                "PFS temp DH retry exceeded 5 attempts",
507                            ));
508                        }
509                        let (rr, s3r) = ferogram_mtproto::retry_step3(
510                            &dh_params,
511                            nonce,
512                            server_nonce,
513                            new_nonce,
514                            retry_id,
515                        )
516                        .map_err(|e| ConnectError::other(e.to_string()))?;
517                        send_frame(stream, &plain.pack(&rr).to_plaintext_bytes(), frame_kind)
518                            .await?;
519                        let ar: tl::enums::SetClientDhParamsAnswer =
520                            recv_frame_plain(stream, frame_kind).await?;
521                        result = auth::finish(s3r, ar)
522                            .map_err(|e| ConnectError::other(e.to_string()))?;
523                    }
524                }
525            }
526        };
527
528        let temp_key = done.auth_key;
529        let temp_salt = done.first_salt;
530        let temp_offset = done.time_offset;
531
532        // build bindTempAuthKey body
533        let temp_key_id = auth_key_id_from_key(&temp_key);
534        let perm_key_id = auth_key_id_from_key(perm_auth_key);
535
536        let mut nonce_buf = [0u8; 8];
537        ferogram_crypto::fill_random(&mut nonce_buf);
538        let nonce = i64::from_le_bytes(nonce_buf);
539
540        let server_now = std::time::SystemTime::now()
541            .duration_since(std::time::UNIX_EPOCH)
542            .unwrap()
543            .as_secs() as i32
544            + temp_offset;
545        let expires_at = server_now + TEMP_EXPIRES;
546
547        let seen = new_seen_msg_ids();
548        let mut temp_enc = EncryptedSession::with_seen(temp_key, temp_salt, temp_offset, seen);
549        let temp_session_id = temp_enc.session_id();
550
551        let msg_id = gen_msg_id();
552        let enc_msg = encrypt_bind_inner(
553            perm_auth_key,
554            msg_id,
555            nonce,
556            temp_key_id,
557            perm_key_id,
558            temp_session_id,
559            expires_at,
560        );
561        let bind_body = serialize_bind_temp_auth_key(perm_key_id, nonce, expires_at, &enc_msg);
562
563        // send encrypted bind request
564        let wire = temp_enc.pack_body_at_msg_id(&bind_body, msg_id);
565        send_frame(stream, &wire, frame_kind).await?;
566
567        // Receive and verify response.
568        // The server may send informational frames first (msgs_ack, new_session_created)
569        // before the actual rpc_result{boolTrue}, so we loop up to 5 frames.
570        for attempt in 0u8..5 {
571            let mut raw = recv_raw_frame(stream, frame_kind).await?;
572            let decrypted = temp_enc
573                .unpack(&mut raw)
574                .map_err(|e| ConnectError::other(format!("PFS bind decrypt: {e:?}")))?;
575            match decode_bind_response(&decrypted.body) {
576                Ok(()) => {
577                    // bindTempAuthKey succeeds under the temp key; keep the session
578                    // sequence as-is so subsequent RPCs continue from the same MTProto
579                    // message stream.
580                    return Ok(temp_enc);
581                }
582                Err(ref e) if e == "__need_more__" => {
583                    tracing::debug!(
584                        "[ferogram::connect] PFS (DC{dc_id}): got informational frame on attempt {attempt}, reading next"
585                    );
586                    continue;
587                }
588                Err(reason) => {
589                    tracing::error!(
590                        "[ferogram::connect] PFS bind rejected by server for DC{dc_id}: {reason}"
591                    );
592                    return Err(ConnectError::other(format!(
593                        "auth.bindTempAuthKey: {reason}"
594                    )));
595                }
596            }
597        }
598        Err(ConnectError::other(
599            "auth.bindTempAuthKey: no boolTrue after 5 frames",
600        ))
601    }
602
603    /// The permanent auth key, for persisting to the session. Under PFS this
604    /// is `perm_auth_key`, not the short-lived temp key the connection is
605    /// actually encrypted with.
606    pub fn auth_key_bytes(&self) -> [u8; 256] {
607        // When PFS is active, perm_auth_key is the key to persist in the session.
608        // enc.auth_key_bytes() would return the short-lived temp key instead.
609        self.perm_auth_key
610            .unwrap_or_else(|| self.enc.auth_key_bytes())
611    }
612
613    /// Open a TCP connection, negotiate transport framing, and complete the MTProto DH handshake.
614    ///
615    /// Returns `(stream, frame_kind, session)` as owned values. The caller is responsible for
616    /// setting up reader/writer tasks. This is the single authoritative connection path;
617    /// `ferogram-mtsender` delegates here instead of reimplementing the DH sequence.
618    pub async fn connect_to_dc(
619        addr: &str,
620        dc_id: i16,
621        transport: &TransportKind,
622        socks5: Option<&crate::socks5::Socks5Config>,
623        mtproxy: Option<&crate::proxy::MtProxyConfig>,
624    ) -> Result<(TcpStream, FrameKind, EncryptedSession), ConnectError> {
625        let conn = Self::connect_raw(addr, socks5, mtproxy, transport, dc_id).await?;
626        Ok((conn.stream, conn.frame_kind, conn.enc))
627    }
628}
629
630/// Free-function wrapper around [`Connection::connect_to_dc`].
631///
632/// Opens a TCP connection, negotiates transport framing, and completes the
633/// MTProto DH handshake. Returns `(stream, frame_kind, session)`.
634pub async fn connect_to_dc(
635    addr: &str,
636    dc_id: i16,
637    transport: &TransportKind,
638    socks5: Option<&crate::socks5::Socks5Config>,
639    mtproxy: Option<&crate::proxy::MtProxyConfig>,
640) -> Result<(TcpStream, FrameKind, EncryptedSession), ConnectError> {
641    Connection::connect_to_dc(addr, dc_id, transport, socks5, mtproxy).await
642}