Skip to main content

ferogram_mtsender/
sender.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2//
3// ferogram: async Telegram MTProto client in Rust
4// https://github.com/ankit-chaubey/ferogram
5//
6// Licensed under either the MIT License or the Apache License 2.0.
7// See the LICENSE-MIT or LICENSE-APACHE file in this repository:
8// https://github.com/ankit-chaubey/ferogram
9//
10// Feel free to use, modify, and share this code.
11// Please keep this notice when redistributing.
12
13use ferogram_mtproto::{
14    EncryptedSession, SeenMsgIds, Session, authentication as auth, new_seen_msg_ids, step2_temp,
15};
16use ferogram_tl_types as tl;
17use ferogram_tl_types::{Cursor, Deserializable, RemoteCall};
18use tokio::io::{AsyncReadExt, AsyncWriteExt};
19use tokio::net::TcpStream;
20
21use crate::errors::InvocationError;
22use crate::pool::{build_msgs_ack_body, build_msgs_ack_ping_body};
23use ferogram_connect::TransportKind;
24// metrics and tracing
25#[allow(unused_imports)]
26use metrics::{counter, histogram};
27
28/// A single encrypted connection to one Telegram DC.
29/// Un-acked server msg_ids to accumulate before eagerly flushing a `msgs_ack` frame.
30const PENDING_ACKS_THRESHOLD: usize = 10;
31
32/// `PingDelayDisconnect` interval for worker connections (in GetFile chunks).
33/// Keeps the socket alive within Telegram's 75-second idle-disconnect window.
34const PING_EVERY_N_CHUNKS: u32 = 5;
35
36pub struct DcConnection {
37    stream: TcpStream,
38    enc: EncryptedSession,
39    pending_acks: Vec<i64>,
40    call_count: u32,
41    /// AES-256-CTR cipher for obfuscated transport; None for plain transports.
42    cipher: Option<ferogram_crypto::ObfuscatedCipher>,
43    /// Persistent dedup ring that outlives individual EncryptedSessions.
44    #[allow(dead_code)]
45    seen_msg_ids: SeenMsgIds,
46}
47
48impl DcConnection {
49    /// Race Obfuscated / Abridged / Http transports and return the first to succeed.
50    #[tracing::instrument(skip(socks5), fields(addr = %addr, dc_id = dc_id))]
51    pub async fn connect_fastest(
52        addr: &str,
53        socks5: Option<&ferogram_connect::Socks5Config>,
54        dc_id: i16,
55    ) -> Result<(Self, &'static str), InvocationError> {
56        use tokio::task::JoinSet;
57        let addr = addr.to_owned();
58        let socks5 = socks5.cloned();
59        tracing::debug!("[dc_pool] probing {addr} with 3 transports");
60        let mut set: JoinSet<Result<(DcConnection, &'static str), InvocationError>> =
61            JoinSet::new();
62
63        {
64            let a = addr.clone();
65            let s = socks5.clone();
66            set.spawn(async move {
67                Ok((
68                    DcConnection::connect_raw(
69                        &a,
70                        s.as_ref(),
71                        &TransportKind::Obfuscated { secret: None },
72                        dc_id,
73                    )
74                    .await?,
75                    "Obfuscated",
76                ))
77            });
78        }
79        {
80            let a = addr.clone();
81            let s = socks5.clone();
82            set.spawn(async move {
83                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
84                Ok((
85                    DcConnection::connect_raw(&a, s.as_ref(), &TransportKind::Abridged, dc_id)
86                        .await?,
87                    "Abridged",
88                ))
89            });
90        }
91        {
92            let a = addr.clone();
93            set.spawn(async move {
94                tokio::time::sleep(std::time::Duration::from_millis(800)).await;
95                Ok((
96                    DcConnection::connect_raw(&a, None, &TransportKind::Http, dc_id).await?,
97                    "Http",
98                ))
99            });
100        }
101
102        let mut last_err = InvocationError::Deserialize("connect_fastest: no candidates".into());
103        while let Some(outcome) = set.join_next().await {
104            match outcome {
105                Ok(Ok((conn, label))) => {
106                    set.abort_all();
107                    return Ok((conn, label));
108                }
109                Ok(Err(e)) => {
110                    last_err = e;
111                }
112                Err(e) if e.is_cancelled() => {}
113                Err(_) => {}
114            }
115        }
116        Err(last_err)
117    }
118
119    /// Connect and perform full DH handshake.
120    #[tracing::instrument(skip(socks5, transport), fields(addr = %addr, dc_id = dc_id))]
121    pub async fn connect_raw(
122        addr: &str,
123        socks5: Option<&ferogram_connect::Socks5Config>,
124        transport: &TransportKind,
125        dc_id: i16,
126    ) -> Result<Self, InvocationError> {
127        tracing::debug!("[dc_pool] Connecting to {addr} …");
128        let mut stream = Self::open_tcp(addr, socks5).await?;
129        let mut cipher = Self::send_transport_init(&mut stream, transport, dc_id).await?;
130
131        let mut plain = Session::new();
132
133        let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
134        Self::send_plain_frame(
135            &mut stream,
136            &plain.pack(&req1).to_plaintext_bytes(),
137            cipher.as_mut(),
138        )
139        .await?;
140        let res_pq: tl::enums::ResPq = Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
141
142        let (req2, s2) = auth::step2(s1, res_pq, dc_id as i32)
143            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
144        Self::send_plain_frame(
145            &mut stream,
146            &plain.pack(&req2).to_plaintext_bytes(),
147            cipher.as_mut(),
148        )
149        .await?;
150        let dh: tl::enums::ServerDhParams =
151            Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
152
153        let (req3, s3) =
154            auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
155        Self::send_plain_frame(
156            &mut stream,
157            &plain.pack(&req3).to_plaintext_bytes(),
158            cipher.as_mut(),
159        )
160        .await?;
161        let ans: tl::enums::SetClientDhParamsAnswer =
162            Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
163
164        // Retry loop for dh_gen_retry (up to 5 attempts).
165        let done = {
166            let mut result =
167                auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
168            let mut attempts = 0u8;
169            loop {
170                match result {
171                    auth::FinishResult::Done(d) => break d,
172                    auth::FinishResult::Retry {
173                        retry_id,
174                        dh_params,
175                        nonce,
176                        server_nonce,
177                        new_nonce,
178                    } => {
179                        attempts += 1;
180                        if attempts >= 5 {
181                            return Err(InvocationError::Deserialize(
182                                "dh_gen_retry exceeded 5 attempts".into(),
183                            ));
184                        }
185                        let (req_retry, s3_retry) =
186                            auth::retry_step3(&dh_params, nonce, server_nonce, new_nonce, retry_id)
187                                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
188                        Self::send_plain_frame(
189                            &mut stream,
190                            &plain.pack(&req_retry).to_plaintext_bytes(),
191                            cipher.as_mut(),
192                        )
193                        .await?;
194                        let ans_retry: tl::enums::SetClientDhParamsAnswer =
195                            Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
196                        result = auth::finish(s3_retry, ans_retry)
197                            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
198                    }
199                }
200            }
201        };
202        tracing::debug!("[dc_pool] DH complete ✓ for {addr}");
203
204        let seen = new_seen_msg_ids();
205        Ok(Self {
206            stream,
207            cipher,
208            enc: EncryptedSession::with_seen(
209                done.auth_key,
210                done.first_salt,
211                done.time_offset,
212                seen.clone(),
213            ),
214            pending_acks: Vec::new(),
215            call_count: 0,
216            seen_msg_ids: seen,
217        })
218    }
219
220    /// Connect with an already-known auth key (no DH needed).
221    /// If `pfs` is true, performs a temp-key DH bind before any RPCs.
222    #[allow(clippy::too_many_arguments)]
223    pub async fn connect_with_key(
224        addr: &str,
225        auth_key: [u8; 256],
226        first_salt: i64,
227        time_offset: i32,
228        socks5: Option<&ferogram_connect::Socks5Config>,
229        mtproxy: Option<&ferogram_connect::MtProxyConfig>,
230        transport: &TransportKind,
231        dc_id: i16,
232        pfs: bool,
233    ) -> Result<Self, InvocationError> {
234        let (mut stream, mut cipher) = if let Some(mp) = mtproxy {
235            let mut s = mp.connect().await?;
236            s.set_nodelay(true)?;
237            let c = Self::send_transport_init(&mut s, &mp.transport, dc_id).await?;
238            (s, c)
239        } else {
240            let mut s = Self::open_tcp(addr, socks5).await?;
241            let c = Self::send_transport_init(&mut s, transport, dc_id).await?;
242            (s, c)
243        };
244
245        if pfs {
246            tracing::debug!("[dc_pool] PFS: temp DH bind for DC{dc_id}");
247            match Self::do_pool_pfs_bind(&mut stream, cipher.as_mut(), &auth_key, dc_id).await {
248                Ok(temp_enc) => {
249                    tracing::info!("[dc_pool] PFS bind complete DC{dc_id}");
250                    return Ok(Self {
251                        stream,
252                        cipher,
253                        enc: temp_enc,
254                        pending_acks: Vec::new(),
255                        call_count: 0,
256                        seen_msg_ids: new_seen_msg_ids(),
257                    });
258                }
259                Err(e) => {
260                    tracing::warn!("[dc_pool] PFS bind failed DC{dc_id} ({e}); falling back");
261                    return Err(e);
262                }
263            }
264        }
265
266        let seen = new_seen_msg_ids();
267        Ok(Self {
268            stream,
269            cipher,
270            enc: EncryptedSession::with_seen(auth_key, first_salt, time_offset, seen.clone()),
271            pending_acks: Vec::new(),
272            call_count: 0,
273            seen_msg_ids: seen,
274        })
275    }
276
277    /// Temp-key DH handshake + auth.bindTempAuthKey on an existing stream.
278    #[allow(clippy::needless_option_as_deref)]
279    async fn do_pool_pfs_bind(
280        stream: &mut tokio::net::TcpStream,
281        mut cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
282        perm_auth_key: &[u8; 256],
283        dc_id: i16,
284    ) -> Result<EncryptedSession, InvocationError> {
285        use ferogram_mtproto::{
286            auth_key_id_from_key, encrypt_bind_inner, gen_msg_id, new_seen_msg_ids,
287            serialize_bind_temp_auth_key,
288        };
289        const TEMP_EXPIRES: i32 = 86_400; // 24 h
290
291        // temp-key DH
292        let mut plain = ferogram_mtproto::Session::new();
293
294        let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
295        Self::send_plain_frame(
296            stream,
297            &plain.pack(&req1).to_plaintext_bytes(),
298            cipher.as_deref_mut(),
299        )
300        .await?;
301        let res_pq: tl::enums::ResPq =
302            Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
303
304        let (req2, s2) = step2_temp(s1, res_pq, dc_id as i32, TEMP_EXPIRES)
305            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
306        Self::send_plain_frame(
307            stream,
308            &plain.pack(&req2).to_plaintext_bytes(),
309            cipher.as_deref_mut(),
310        )
311        .await?;
312        let dh: tl::enums::ServerDhParams =
313            Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
314
315        let (req3, s3) =
316            auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
317        Self::send_plain_frame(
318            stream,
319            &plain.pack(&req3).to_plaintext_bytes(),
320            cipher.as_deref_mut(),
321        )
322        .await?;
323        let ans: tl::enums::SetClientDhParamsAnswer =
324            Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
325
326        let done = {
327            let mut result =
328                auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
329            let mut attempts = 0u8;
330            loop {
331                match result {
332                    ferogram_mtproto::FinishResult::Done(d) => break d,
333                    ferogram_mtproto::FinishResult::Retry {
334                        retry_id,
335                        dh_params,
336                        nonce,
337                        server_nonce,
338                        new_nonce,
339                    } => {
340                        attempts += 1;
341                        if attempts >= 5 {
342                            return Err(InvocationError::Deserialize(
343                                "PFS pool temp DH retry exceeded 5".into(),
344                            ));
345                        }
346                        let (rr, s3r) = ferogram_mtproto::retry_step3(
347                            &dh_params,
348                            nonce,
349                            server_nonce,
350                            new_nonce,
351                            retry_id,
352                        )
353                        .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
354                        Self::send_plain_frame(
355                            stream,
356                            &plain.pack(&rr).to_plaintext_bytes(),
357                            cipher.as_deref_mut(),
358                        )
359                        .await?;
360                        let ar: tl::enums::SetClientDhParamsAnswer =
361                            Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
362                        result = auth::finish(s3r, ar)
363                            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
364                    }
365                }
366            }
367        };
368
369        let temp_key = done.auth_key;
370        let temp_salt = done.first_salt;
371        let temp_offset = done.time_offset;
372
373        // build bindTempAuthKey body
374        let temp_key_id = auth_key_id_from_key(&temp_key);
375        let perm_key_id = auth_key_id_from_key(perm_auth_key);
376
377        let mut nonce_buf = [0u8; 8];
378        getrandom::getrandom(&mut nonce_buf)
379            .map_err(|_| InvocationError::Deserialize("getrandom nonce".into()))?;
380        let nonce = i64::from_le_bytes(nonce_buf);
381
382        let server_now = std::time::SystemTime::now()
383            .duration_since(std::time::UNIX_EPOCH)
384            .expect("system clock is before UNIX epoch")
385            .as_secs() as i32
386            + temp_offset;
387        let expires_at = server_now + TEMP_EXPIRES;
388
389        let seen = new_seen_msg_ids();
390        let mut temp_enc = EncryptedSession::with_seen(temp_key, temp_salt, temp_offset, seen);
391        let temp_session_id = temp_enc.session_id();
392
393        let msg_id = gen_msg_id();
394        let enc_msg = encrypt_bind_inner(
395            perm_auth_key,
396            msg_id,
397            nonce,
398            temp_key_id,
399            perm_key_id,
400            temp_session_id,
401            expires_at,
402        );
403        let bind_body = serialize_bind_temp_auth_key(perm_key_id, nonce, expires_at, &enc_msg);
404
405        // send encrypted bind request
406        let wire = temp_enc.pack_body_at_msg_id(&bind_body, msg_id);
407        Self::send_abridged(stream, &wire, cipher.as_deref_mut()).await?;
408
409        // Receive and verify response.
410        // The server may send informational frames first (msgs_ack, new_session_created)
411        // before the actual rpc_result{boolTrue}, so we loop up to 5 frames.
412        for attempt in 0u8..5 {
413            let mut raw = Self::recv_abridged(stream, cipher.as_deref_mut()).await?;
414            let decrypted = temp_enc.unpack(&mut raw).map_err(|e| {
415                InvocationError::Deserialize(format!("PFS pool bind decrypt: {e:?}"))
416            })?;
417            match pfs_pool_decode_bind_response(&decrypted.body) {
418                Ok(()) => {
419                    // bindTempAuthKey succeeds under the temp key; keep the session
420                    // sequence as-is so subsequent RPCs continue from the same MTProto
421                    // message stream.
422                    return Ok(temp_enc);
423                }
424                Err(ref e) if e == "__need_more__" => {
425                    tracing::debug!(
426                        "[ferogram] PFS pool bind (DC{dc_id}): informational frame {attempt}, reading next"
427                    );
428                    continue;
429                }
430                Err(reason) => {
431                    tracing::error!(
432                        "[ferogram] PFS pool bind server response (DC{dc_id}): {reason}"
433                    );
434                    return Err(InvocationError::Deserialize(format!(
435                        "auth.bindTempAuthKey (pool): {reason}"
436                    )));
437                }
438            }
439        }
440        Err(InvocationError::Deserialize(
441            "auth.bindTempAuthKey (pool): no boolTrue after 5 frames".into(),
442        ))
443    }
444
445    async fn open_tcp(
446        addr: &str,
447        socks5: Option<&ferogram_connect::Socks5Config>,
448    ) -> Result<TcpStream, InvocationError> {
449        let stream = match socks5 {
450            Some(proxy) => proxy.connect(addr).await?,
451            None => TcpStream::connect(addr).await?,
452        };
453        // Disable Nagle for immediate single-frame delivery.
454        stream.set_nodelay(true)?;
455        // SO_KEEPALIVE keeps worker connections alive across idle periods.
456        {
457            let sock = socket2::SockRef::from(&stream);
458            let ka = socket2::TcpKeepalive::new()
459                .with_time(std::time::Duration::from_secs(10))
460                .with_interval(std::time::Duration::from_secs(5));
461            #[cfg(not(target_os = "windows"))]
462            let ka = ka.with_retries(3);
463            sock.set_tcp_keepalive(&ka).ok();
464        }
465        Ok(stream)
466    }
467
468    async fn send_transport_init(
469        stream: &mut TcpStream,
470        transport: &TransportKind,
471        dc_id: i16,
472    ) -> Result<Option<ferogram_crypto::ObfuscatedCipher>, InvocationError> {
473        match transport {
474            TransportKind::Abridged => {
475                stream.write_all(&[0xef]).await?;
476            }
477            TransportKind::Intermediate => {
478                stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
479            }
480            TransportKind::Full => {}
481            TransportKind::Obfuscated { secret } => {
482                use sha2::Digest;
483                let mut nonce = [0u8; 64];
484                loop {
485                    getrandom::getrandom(&mut nonce)
486                        .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
487                    let first = u32::from_le_bytes(nonce[0..4].try_into().unwrap());
488                    let second = u32::from_le_bytes(nonce[4..8].try_into().unwrap());
489                    let bad = nonce[0] == 0xEF
490                        || first == 0x44414548
491                        || first == 0x54534F50
492                        || first == 0x20544547
493                        || first == 0x4954504f  // OPTIONS
494                        || first == 0xEEEEEEEE
495                        || first == 0xDDDDDDDD
496                        || first == 0x02010316
497                        || second == 0x00000000;
498                    if !bad {
499                        break;
500                    }
501                }
502                let tx_raw: [u8; 32] = nonce[8..40].try_into().unwrap();
503                let tx_iv: [u8; 16] = nonce[40..56].try_into().unwrap();
504                let mut rev48 = nonce[8..56].to_vec();
505                rev48.reverse();
506                let rx_raw: [u8; 32] = rev48[0..32].try_into().unwrap();
507                let rx_iv: [u8; 16] = rev48[32..48].try_into().unwrap();
508                let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
509                    let mut h = sha2::Sha256::new();
510                    h.update(tx_raw);
511                    h.update(s.as_ref());
512                    let tx: [u8; 32] = h.finalize().into();
513                    let mut h = sha2::Sha256::new();
514                    h.update(rx_raw);
515                    h.update(s.as_ref());
516                    let rx: [u8; 32] = h.finalize().into();
517                    (tx, rx)
518                } else {
519                    (tx_raw, rx_raw)
520                };
521                nonce[56] = 0xef;
522                nonce[57] = 0xef;
523                nonce[58] = 0xef;
524                nonce[59] = 0xef;
525                let dc_bytes = dc_id.to_le_bytes();
526                nonce[60] = dc_bytes[0];
527                nonce[61] = dc_bytes[1];
528                let mut enc =
529                    ferogram_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
530                let mut skip = [0u8; 56];
531                enc.encrypt(&mut skip);
532                enc.encrypt(&mut nonce[56..64]);
533                stream.write_all(&nonce).await?;
534                return Ok(Some(enc));
535            }
536            TransportKind::PaddedIntermediate { secret } => {
537                use sha2::Digest;
538                let mut nonce = [0u8; 64];
539                loop {
540                    getrandom::getrandom(&mut nonce)
541                        .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
542                    let first = u32::from_le_bytes(nonce[0..4].try_into().unwrap());
543                    let second = u32::from_le_bytes(nonce[4..8].try_into().unwrap());
544                    let bad = nonce[0] == 0xEF
545                        || first == 0x44414548
546                        || first == 0x54534F50
547                        || first == 0x20544547
548                        || first == 0x4954504f
549                        || first == 0xEEEEEEEE
550                        || first == 0xDDDDDDDD
551                        || first == 0x02010316
552                        || second == 0x00000000;
553                    if !bad {
554                        break;
555                    }
556                }
557                let tx_raw: [u8; 32] = nonce[8..40].try_into().unwrap();
558                let tx_iv: [u8; 16] = nonce[40..56].try_into().unwrap();
559                let mut rev48 = nonce[8..56].to_vec();
560                rev48.reverse();
561                let rx_raw: [u8; 32] = rev48[0..32].try_into().unwrap();
562                let rx_iv: [u8; 16] = rev48[32..48].try_into().unwrap();
563                let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
564                    let mut h = sha2::Sha256::new();
565                    h.update(tx_raw);
566                    h.update(s.as_ref());
567                    let tx: [u8; 32] = h.finalize().into();
568                    let mut h = sha2::Sha256::new();
569                    h.update(rx_raw);
570                    h.update(s.as_ref());
571                    let rx: [u8; 32] = h.finalize().into();
572                    (tx, rx)
573                } else {
574                    (tx_raw, rx_raw)
575                };
576                nonce[56] = 0xdd;
577                nonce[57] = 0xdd;
578                nonce[58] = 0xdd;
579                nonce[59] = 0xdd;
580                let dc_bytes = dc_id.to_le_bytes();
581                nonce[60] = dc_bytes[0];
582                nonce[61] = dc_bytes[1];
583                let mut enc =
584                    ferogram_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
585                let mut skip = [0u8; 56];
586                enc.encrypt(&mut skip);
587                enc.encrypt(&mut nonce[56..64]);
588                stream.write_all(&nonce).await?;
589                return Ok(Some(enc));
590            }
591            TransportKind::FakeTls { .. } => {
592                // FakeTls requires a full TLS 1.2 ClientHello handshake which is not yet
593                // implemented in DcPool worker connections. Use Obfuscated or
594                // PaddedIntermediate for proxy connections instead.
595                return Err(InvocationError::Deserialize(
596                    "FakeTls transport is not supported for DcPool connections".into(),
597                ));
598            }
599            TransportKind::Http => {}
600        }
601        Ok(None)
602    }
603
604    pub fn auth_key_bytes(&self) -> [u8; 256] {
605        self.enc.auth_key_bytes()
606    }
607    pub fn first_salt(&self) -> i64 {
608        self.enc.salt
609    }
610    pub fn time_offset(&self) -> i32 {
611        self.enc.time_offset
612    }
613
614    #[tracing::instrument(skip(self, req), fields(method = std::any::type_name::<R>()))]
615    pub async fn rpc_call<R: RemoteCall>(&mut self, req: &R) -> Result<Vec<u8>, InvocationError> {
616        let _t0 = std::time::Instant::now();
617        // Periodic PingDelayDisconnect: sent before the request to piggyback on
618        // the same TCP write window.  Keeps the socket alive across the download.
619        self.call_count += 1;
620        if self.call_count.is_multiple_of(PING_EVERY_N_CHUNKS) {
621            let ping_id = self.call_count as i64;
622            let ping_body = build_msgs_ack_ping_body(ping_id);
623            // PingDelayDisconnect is content-related (returns Pong): must use odd seq_no.
624            let (ping_wire, _) = self.enc.pack_body_with_msg_id(&ping_body, true);
625            // This ping is fire-and-forget. The Pong response is a content-related
626            // server message and must be acknowledged. If the RPC result arrives before
627            // the Pong, the Pong's msg_id is never added to pending_acks. On idle
628            // connections (no subsequent RPCs) the un-acked Pong will eventually cause
629            // Telegram to close the connection. A dedicated always-running reader task
630            // that drains and acks all server messages would fix this permanently; for
631            // now the next rpc_call iteration receives and acks the Pong via pending_acks.
632            let _ = Self::send_abridged(&mut self.stream, &ping_wire, self.cipher.as_mut()).await;
633        }
634
635        // Flush pending acks.
636        if !self.pending_acks.is_empty() {
637            let ack_body = build_msgs_ack_body(&self.pending_acks);
638            let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
639            let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
640            self.pending_acks.clear();
641        }
642
643        // Track sent msg_id to verify rpc_result.req_msg_id and discard stale responses.
644        let (wire, mut sent_msg_id) = self.enc.pack_with_msg_id(req);
645        Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
646        let mut salt_retries = 0u8;
647        let mut session_resets = 0u8;
648        loop {
649            let mut raw = Self::recv_abridged(&mut self.stream, self.cipher.as_mut()).await?;
650            let msg = self
651                .enc
652                .unpack(&mut raw)
653                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
654            // Track every received msg_id for acknowledgement.
655            self.pending_acks.push(msg.msg_id);
656            if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
657                // Eager flush: too many un-acked messages  - Telegram will close the
658                // connection if we don't ack within its window.
659                let ack_body = build_msgs_ack_body(&self.pending_acks);
660                let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
661                let _ =
662                    Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
663                self.pending_acks.clear();
664            }
665            // Salt is updated only on explicit bad_server_salt, not on every message.
666            if msg.body.len() < 4 {
667                return Ok(msg.body);
668            }
669            let mut need_resend = false;
670            let mut need_session_reset = false;
671            let mut bad_msg_code: Option<u32> = None;
672            let mut bad_msg_server_id: Option<i64> = None;
673            // Process all flags before returning: containers may carry
674            // new_session_created + rpc_result together.
675            let scan_result = Self::scan_body(
676                &msg.body,
677                &mut self.enc.salt,
678                &mut need_resend,
679                &mut need_session_reset,
680                &mut bad_msg_code,
681                &mut bad_msg_server_id,
682                Some(sent_msg_id),
683                msg.msg_id,
684            )?;
685            // new_session_created requires seq_no reset to 0.
686            if need_session_reset {
687                session_resets += 1;
688                if session_resets > 2 {
689                    return Err(InvocationError::Deserialize(
690                        "new_session_created: exceeded 2 resets".into(),
691                    ));
692                }
693                if !self.pending_acks.is_empty() {
694                    let ack_body = build_msgs_ack_body(&self.pending_acks);
695                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
696                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
697                        .await;
698                    self.pending_acks.clear();
699                }
700                // Keep the current session sequence. new_session_created updates the
701                // server salt and may require resending stale requests, but it does
702                // not require zeroing the local MTProto seq counter.
703                if scan_result.is_none() {
704                    // No result yet; resend using the current MTProto sequence.
705                    tracing::debug!(
706                        "[dc_pool] new_session_created: resending [{session_resets}/2]"
707                    );
708                    let (wire, new_id) = self.enc.pack_with_msg_id(req);
709                    sent_msg_id = new_id;
710                    Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
711                }
712                // If scan_result.is_some(), the result arrived in the same container
713                // as new_session_created; session has been reset for future calls,
714                // fall through to return the result.
715            } else if need_resend {
716                // Apply seq_no / time corrections from bad_msg_notification.
717                match bad_msg_code {
718                    Some(16) | Some(17) => {
719                        if let Some(srv_id) = bad_msg_server_id {
720                            self.enc.correct_time_offset(srv_id);
721                        }
722                        // Do not call undo_seq_no here. Reusing the same seq_no on a
723                        // retry violates MTProto monotonicity; the server may reject
724                        // with code 32. Let the next pack_with_msg_id assign the next
725                        // available odd seq_no for the resent message.
726                    }
727                    Some(32) | Some(33) => {
728                        // correct_seq_no does a full session reset (new session_id,
729                        // seq_no=0) instead of magic +/- offsets.
730                        self.enc
731                            .correct_seq_no(bad_msg_code.expect("matched Some arm"));
732                    }
733                    _ => {
734                        // bad_server_salt or bad_msg code 48
735                        self.enc.undo_seq_no();
736                    }
737                }
738                salt_retries += 1;
739                if salt_retries >= 5 {
740                    return Err(InvocationError::Deserialize(
741                        "bad_server_salt/bad_msg: exceeded 5 retries".into(),
742                    ));
743                }
744                tracing::debug!(
745                    "[dc_pool] resend in transfer conn (code={bad_msg_code:?}) [{salt_retries}/5]"
746                );
747                if !self.pending_acks.is_empty() {
748                    let ack_body = build_msgs_ack_body(&self.pending_acks);
749                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
750                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
751                        .await;
752                    self.pending_acks.clear();
753                }
754                let (wire, new_id) = self.enc.pack_with_msg_id(req);
755                sent_msg_id = new_id;
756                Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
757            }
758            if let Some(result) = scan_result {
759                metrics::counter!("ferogram.rpc_calls_total", "result" => "ok").increment(1);
760                metrics::histogram!("ferogram.rpc_latency_ms")
761                    .record(_t0.elapsed().as_millis() as f64);
762                return Ok(result);
763            }
764        }
765    }
766    ///
767    /// Returns `Ok(Some(bytes))` when rpc_result is found.
768    /// Returns `Ok(None)` for informational messages (continue reading).
769    /// Returns `Err` for rpc_error or parse failures.
770    ///
771    /// Output flags:
772    /// - `need_resend`: set for bad_server_salt / bad_msg_notification (codes 16/17/32/33/48)
773    /// - `need_session_reset`: set for new_session_created (seq_no must reset to 0)
774    /// - `bad_msg_code`: error_code from bad_msg_notification for caller to apply correction
775    /// - `bad_msg_server_id`: server msg_id for time-offset correction (codes 16/17)
776    /// - `server_msg_id`: outer frame msg_id for time-offset correction (codes 16/17).
777    ///   Must be msg.msg_id from the caller, not bad_msg_id (client clock, not server's).
778    #[allow(clippy::too_many_arguments)]
779    fn scan_body(
780        body: &[u8],
781        salt: &mut i64,
782        need_resend: &mut bool,
783        need_session_reset: &mut bool,
784        bad_msg_code: &mut Option<u32>,
785        bad_msg_server_id: &mut Option<i64>,
786        sent_msg_id: Option<i64>,
787        server_msg_id: i64,
788    ) -> Result<Option<Vec<u8>>, InvocationError> {
789        if body.len() < 4 {
790            return Ok(None);
791        }
792        let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
793        match cid {
794            0xf35c6d01 /* rpc_result: CID(4) + req_msg_id(8) + result */ => {
795                if body.len() >= 12
796                    && let Some(expected) = sent_msg_id {
797                        let resp_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
798                        if resp_id != expected {
799                            tracing::debug!(
800                                "[dc_pool] rpc_result req_msg_id mismatch \
801                                 (got {resp_id:#018x}, want {expected:#018x}); skipping"
802                            );
803                            return Ok(None);
804                        }
805                    }
806                let inner = if body.len() >= 12 { &body[12..] } else { body };
807                // Inner body may itself be gzip_packed (e.g. help.Config inside rpc_result).
808                if inner.len() >= 4
809                    && u32::from_le_bytes(inner[..4].try_into().unwrap()) == 0x3072cfa1
810                {
811                    let mut dummy_salt = *salt;
812                    let mut nr = false; let mut nsr = false;
813                    let mut bc = None; let mut bsi = None;
814                    if let Some(r) = Self::scan_body(inner, &mut dummy_salt, &mut nr, &mut nsr, &mut bc, &mut bsi, None, server_msg_id)? {
815                        return Ok(Some(r));
816                    }
817                    // Unwrap the gzip directly and return the decompressed bytes.
818                    if let Some(compressed) = tl_read_bytes(&inner[4..]) {
819                        let dec = flate2::read::GzDecoder::new(compressed.as_slice());
820                        let mut limited = std::io::Read::take(dec, 16 * 1024 * 1024);
821                        let mut out = Vec::new();
822                        if std::io::Read::read_to_end(&mut limited, &mut out).is_ok() {
823                            return Ok(Some(out));
824                        }
825                    }
826                    return Ok(None);
827                }
828                if inner.len() >= 8
829                    && u32::from_le_bytes(inner[..4].try_into().unwrap()) == 0x2144ca19
830                {
831                    let code = i32::from_le_bytes(inner[4..8].try_into().unwrap());
832                    let message = tl_read_string(&inner[8..]).unwrap_or_default();
833                    return Err(InvocationError::Rpc(
834                        crate::errors::RpcError::from_telegram(code, &message),
835                    ));
836                }
837                Ok(Some(inner.to_vec()))
838            }
839            0x2144ca19 /* rpc_error */ => {
840                if body.len() < 8 {
841                    return Err(InvocationError::Deserialize("rpc_error short".into()));
842                }
843                let code = i32::from_le_bytes(body[4..8].try_into().unwrap());
844                let message = tl_read_string(&body[8..]).unwrap_or_default();
845                Err(InvocationError::Rpc(crate::errors::RpcError::from_telegram(code, &message)))
846            }
847            0xedab447b /* bad_server_salt */ => {
848                // bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int error_code:int new_server_salt:long
849                if body.len() >= 28 {
850                    let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
851                    let new_salt   = i64::from_le_bytes(body[20..28].try_into().unwrap());
852                    // Only apply new salt when bad_msg_id matches our sent request;
853                    // stale frames from prior requests must not corrupt the current salt.
854                    if sent_msg_id.is_none_or(|id| id == bad_msg_id) {
855                        *salt = new_salt;
856                        *need_resend = true;
857                    }
858                }
859                Ok(None)
860            }
861            0x9ec20908 /* new_session_created */ => {
862                // new_session_created#9ec20908 first_msg_id:long unique_id:long server_salt:long
863                // Signal need_session_reset so the caller resets seq_no before resending.
864                if body.len() >= 28 {
865                    let first_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
866                    let unique_id    = i64::from_le_bytes(body[12..20].try_into().unwrap());
867                    let server_salt  = i64::from_le_bytes(body[20..28].try_into().unwrap());
868                    tracing::debug!(
869                        "[dc_pool] new_session_created: unique_id={unique_id:#018x} \
870                         first_msg_id={first_msg_id} salt={server_salt}"
871                    );
872                    *salt = server_salt;
873                    // Only reset if the pending request predates the server's new session.
874                    // If sent_msg_id == first_msg_id (fresh worker conn on first send),
875                    // the server will reply with our current session_id. Unconditionally
876                    // calling reset_session() here changes the id, causing the response
877                    // decrypt to fail with session_id mismatch.
878                    if sent_msg_id.is_some_and(|id| id < first_msg_id) {
879                        *need_session_reset = true;
880                    }
881                }
882                Ok(None)
883            }
884            0xa7eff811 /* bad_msg_notification */ => {
885                // bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int error_code:int
886                //
887                // TL layout: body[4..12]=bad_msg_id, body[12..16]=bad_msg_seqno,
888                // body[16..20]=error_code. Previous code read [12..16] as error_code
889                // (bad_msg_seqno), so error matching always compared the wrong field.
890                if body.len() >= 20 {
891                    let bad_msg_id  = i64::from_le_bytes(body[4..12].try_into().unwrap());
892                    // body[12..16] = bad_msg_seqno, not used for recovery.
893                    let error_code  = u32::from_le_bytes(body[16..20].try_into().unwrap());
894                    tracing::debug!(
895                        "[dc_pool] bad_msg_notification: bad_msg_id={bad_msg_id:#018x} code={error_code}"
896                    );
897                    match error_code {
898                        16 | 17 => {
899                            // msg_id too low/high: time-offset correction needed.
900                            // server_msg_id upper 32 bits = server Unix timestamp.
901                            // bad_msg_id carries the client's clock, not the server's.
902                            *bad_msg_code = Some(error_code);
903                            *bad_msg_server_id = Some(server_msg_id);
904                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
905                        }
906                        32 | 33 => {
907                            // seq_no wrong.
908                            *bad_msg_code = Some(error_code);
909                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
910                        }
911                        48 => {
912                            // bad_msg code 48 = incorrect server salt. Per spec, this
913                            // arrives together with a bad_server_salt frame in the same
914                            // container that carries the new salt. If bad_server_salt was
915                            // already processed, *salt is updated and the resend uses the
916                            // correct value. If not (partial container), resend once
917                            // conservatively; the retry loop's 5-attempt cap prevents a loop.
918                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
919                            tracing::debug!(
920                                "[dc_pool] bad_msg code 48 (wrong salt): will resend with current salt"
921                            );
922                        }
923                        _ => {
924                            // Unknown code; resend to avoid the loop stalling.
925                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
926                        }
927                    }
928                }
929                Ok(None)
930            }
931            0x347773c5 /* pong */ => {
932                // Pong is returned for both internal PingDelayDisconnect (fire-and-forget)
933                // and user-invoked Ping (which has a pending invoke future waiting).
934                // pong layout: CID(4) + msg_id(8) + ping_id(8)
935                // pong.msg_id is the msg_id of the original ping request.
936                // Route back to the caller when it matches the pending sent_msg_id.
937                if body.len() >= 12
938                    && let Some(expected) = sent_msg_id
939                {
940                    let pong_req_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
941                    if pong_req_id == expected {
942                        return Ok(Some(body.to_vec()));
943                    }
944                }
945                // Internal keepalive pong - discard.
946                Ok(None)
947            }
948            0x73f1f8dc /* msg_container */ => {
949                if body.len() < 8 {
950                    return Ok(None);
951                }
952                let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
953                let mut pos = 8usize;
954                // Do not early-return: containers may bundle new_session_created + rpc_result
955                // together; all items must be processed so session/salt flags are observed.
956                let mut found: Option<Vec<u8>> = None;
957                for _ in 0..count {
958                    if pos + 16 > body.len() { break; }
959                    let inner_bytes =
960                        u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
961                    pos += 16;
962                    if pos + inner_bytes > body.len() { break; }
963                    let inner = &body[pos..pos + inner_bytes];
964                    pos += inner_bytes;
965                    if found.is_none() {
966                        if let Some(r) = Self::scan_body(inner, salt, need_resend,
967                            need_session_reset, bad_msg_code, bad_msg_server_id, sent_msg_id,
968                            server_msg_id)?
969                        {
970                            found = Some(r);
971                            // Do NOT return  - continue processing remaining items so that
972                            // session/salt flags from co-arriving messages are observed.
973                        }
974                    } else {
975                        // Result already captured; still process remaining items for
976                        // side-effect flags (salt, session reset, bad_msg). Pass
977                        // sent_msg_id so the req_msg_id guard still filters stale
978                        // rpc_results. Passing None would bypass the guard and allow
979                        // a stale response to overwrite `found` on the next iteration.
980                        let _ = Self::scan_body(inner, salt, need_resend, need_session_reset,
981                                                bad_msg_code, bad_msg_server_id, sent_msg_id,
982                                                server_msg_id)?;
983                    }
984                }
985                Ok(found)
986            }
987            0x3072cfa1 /* gzip_packed */ => {
988                // Decompress and recurse: server wraps large responses in gzip_packed.
989                if let Some(compressed) = tl_read_bytes(&body[4..]) {
990                    let decoder = flate2::read::GzDecoder::new(compressed.as_slice());
991                    let mut limited = std::io::Read::take(decoder, 16 * 1024 * 1024);
992                    let mut decompressed = Vec::new();
993                    if std::io::Read::read_to_end(&mut limited, &mut decompressed).is_ok()
994                        && !decompressed.is_empty()
995                    {
996                        return Self::scan_body(
997                            &decompressed, salt,
998                            need_resend, need_session_reset,
999                            bad_msg_code, bad_msg_server_id,
1000                            sent_msg_id,
1001                            server_msg_id,
1002                        );
1003                    }
1004                }
1005                Ok(None)
1006            }
1007            _ => Ok(None),
1008        }
1009    }
1010
1011    /// Like `rpc_call` but accepts any `Serializable` type (not just `RemoteCall`).
1012    pub async fn rpc_call_serializable<S: ferogram_tl_types::Serializable>(
1013        &mut self,
1014        req: &S,
1015    ) -> Result<Vec<u8>, InvocationError> {
1016        if !self.pending_acks.is_empty() {
1017            let ack_body = build_msgs_ack_body(&self.pending_acks);
1018            let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1019            let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
1020            self.pending_acks.clear();
1021        }
1022        let (wire, mut sent_msg_id) = self.enc.pack_serializable_with_msg_id(req);
1023        Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
1024        let mut salt_retries = 0u8;
1025        let mut session_resets = 0u8;
1026        loop {
1027            let mut raw = Self::recv_abridged(&mut self.stream, self.cipher.as_mut()).await?;
1028            let msg = self
1029                .enc
1030                .unpack(&mut raw)
1031                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1032            self.pending_acks.push(msg.msg_id);
1033            if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
1034                let ack_body = build_msgs_ack_body(&self.pending_acks);
1035                let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1036                let _ =
1037                    Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
1038                self.pending_acks.clear();
1039            }
1040            // Salt updated only on explicit bad_server_salt, not on every message.
1041            if msg.body.len() < 4 {
1042                return Ok(msg.body);
1043            }
1044            let mut need_resend = false;
1045            let mut need_session_reset = false;
1046            let mut bad_msg_code: Option<u32> = None;
1047            let mut bad_msg_server_id: Option<i64> = None;
1048            // Save result before handling flags; apply all before returning.
1049            let scan_result = Self::scan_body(
1050                &msg.body,
1051                &mut self.enc.salt,
1052                &mut need_resend,
1053                &mut need_session_reset,
1054                &mut bad_msg_code,
1055                &mut bad_msg_server_id,
1056                Some(sent_msg_id),
1057                msg.msg_id,
1058            )?;
1059            if need_session_reset {
1060                session_resets += 1;
1061                if session_resets > 2 {
1062                    return Err(InvocationError::Deserialize(
1063                        "new_session_created (serializable): exceeded 2 resets".into(),
1064                    ));
1065                }
1066                if !self.pending_acks.is_empty() {
1067                    let ack_body = build_msgs_ack_body(&self.pending_acks);
1068                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1069                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
1070                        .await;
1071                    self.pending_acks.clear();
1072                }
1073                if scan_result.is_none() {
1074                    let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
1075                    sent_msg_id = new_id;
1076                    Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
1077                }
1078            } else if need_resend {
1079                match bad_msg_code {
1080                    Some(16) | Some(17) => {
1081                        if let Some(srv_id) = bad_msg_server_id {
1082                            self.enc.correct_time_offset(srv_id);
1083                        }
1084                        // Do not call undo_seq_no (see rpc_call for explanation).
1085                    }
1086                    Some(32) | Some(33) => {
1087                        self.enc
1088                            .correct_seq_no(bad_msg_code.expect("matched Some arm"));
1089                    }
1090                    _ => {
1091                        self.enc.undo_seq_no();
1092                    }
1093                }
1094                salt_retries += 1;
1095                if salt_retries >= 5 {
1096                    return Err(InvocationError::Deserialize(
1097                        "bad_server_salt (serializable): exceeded 5 retries".into(),
1098                    ));
1099                }
1100                tracing::debug!(
1101                    "[dc_pool] resend serializable (code={bad_msg_code:?}) [{salt_retries}/5]"
1102                );
1103                if !self.pending_acks.is_empty() {
1104                    let ack_body = build_msgs_ack_body(&self.pending_acks);
1105                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1106                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
1107                        .await;
1108                    self.pending_acks.clear();
1109                }
1110                let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
1111                sent_msg_id = new_id;
1112                Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
1113            }
1114            if let Some(result) = scan_result {
1115                return Ok(result);
1116            }
1117        }
1118    }
1119
1120    /// Send pre-serialized raw bytes and receive the raw response.
1121    /// Used by CDN download connections (no MTProto encryption layer).
1122    pub async fn rpc_call_raw(&mut self, body: &[u8]) -> Result<Vec<u8>, InvocationError> {
1123        Self::send_abridged(&mut self.stream, body, self.cipher.as_mut()).await?;
1124        Self::recv_abridged(&mut self.stream, self.cipher.as_mut()).await
1125    }
1126
1127    async fn send_abridged(
1128        stream: &mut TcpStream,
1129        data: &[u8],
1130        cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1131    ) -> Result<(), InvocationError> {
1132        // Single write_all: avoids Nagle stalls and partial-write corruption.
1133        let words = data.len() / 4;
1134        let mut frame = if words < 0x7f {
1135            let mut v = Vec::with_capacity(1 + data.len());
1136            v.push(words as u8);
1137            v
1138        } else {
1139            let mut v = Vec::with_capacity(4 + data.len());
1140            v.extend_from_slice(&[
1141                0x7f,
1142                (words & 0xff) as u8,
1143                ((words >> 8) & 0xff) as u8,
1144                ((words >> 16) & 0xff) as u8,
1145            ]);
1146            v
1147        };
1148        frame.extend_from_slice(data);
1149        if let Some(c) = cipher {
1150            c.encrypt(&mut frame);
1151        }
1152        stream.write_all(&frame).await?;
1153        Ok(())
1154    }
1155
1156    async fn recv_abridged(
1157        stream: &mut TcpStream,
1158        mut cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1159    ) -> Result<Vec<u8>, InvocationError> {
1160        // 60-second recv timeout: prevents hung reads on silently closed connections.
1161        use tokio::time::{Duration, timeout};
1162        const RECV_TIMEOUT: Duration = Duration::from_secs(60);
1163
1164        let mut h = [0u8; 1];
1165        timeout(RECV_TIMEOUT, stream.read_exact(&mut h))
1166            .await
1167            .map_err(|_| {
1168                InvocationError::Io(std::io::Error::new(
1169                    std::io::ErrorKind::TimedOut,
1170                    "transfer recv: header timeout (60 s)",
1171                ))
1172            })??;
1173        if let Some(ref mut c) = cipher.as_mut() {
1174            c.decrypt(&mut h);
1175        }
1176
1177        // 0x7f = extended length; next 3 bytes are the LE word count.
1178        let words = if h[0] == 0x7f {
1179            let mut b = [0u8; 3];
1180            timeout(RECV_TIMEOUT, stream.read_exact(&mut b))
1181                .await
1182                .map_err(|_| {
1183                    InvocationError::Io(std::io::Error::new(
1184                        std::io::ErrorKind::TimedOut,
1185                        "transfer recv: length timeout (60 s)",
1186                    ))
1187                })??;
1188            if let Some(ref mut c) = cipher.as_mut() {
1189                c.decrypt(&mut b);
1190            }
1191            b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
1192        } else {
1193            h[0] as usize
1194        };
1195
1196        let mut buf = vec![0u8; words * 4];
1197        timeout(RECV_TIMEOUT, stream.read_exact(&mut buf))
1198            .await
1199            .map_err(|_| {
1200                InvocationError::Io(std::io::Error::new(
1201                    std::io::ErrorKind::TimedOut,
1202                    "transfer recv: body timeout (60 s)",
1203                ))
1204            })??;
1205        if let Some(c) = cipher {
1206            c.decrypt(&mut buf);
1207        }
1208
1209        // Transport errors are exactly 4 bytes (negative LE i32).
1210        // A valid encrypted MTProto frame is always ≥ 68 bytes:
1211        //   auth_key_id(8) + msg_key(16) + encrypted[salt(8)+session_id(8)+
1212        //   msg_id(8)+seq_no(4)+data_len(4)+body(≥4)+padding(≥12)] ≥ 68 bytes.
1213        // Checking buf.len() == 4 is therefore both necessary and sufficient to
1214        // distinguish a transport error from a valid encrypted frame; this is
1215        // correct by protocol structure, not merely empirically safe.
1216        if buf.len() == 4 {
1217            let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
1218            if code < 0 {
1219                return Err(InvocationError::Io(std::io::Error::new(
1220                    std::io::ErrorKind::ConnectionRefused,
1221                    format!("transport error from server: {code}"),
1222                )));
1223            }
1224        }
1225
1226        Ok(buf)
1227    }
1228
1229    async fn send_plain_frame(
1230        stream: &mut TcpStream,
1231        data: &[u8],
1232        cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1233    ) -> Result<(), InvocationError> {
1234        // Abridged framing uses word-count (len/4): pad to 4-byte boundary.
1235        // TL parsers ignore trailing zero bytes.
1236        if !data.len().is_multiple_of(4) {
1237            let mut padded = data.to_vec();
1238            let pad = 4 - (data.len() % 4);
1239            padded.resize(data.len() + pad, 0);
1240            Self::send_abridged(stream, &padded, cipher).await
1241        } else {
1242            Self::send_abridged(stream, data, cipher).await
1243        }
1244    }
1245
1246    async fn recv_plain_frame<T: Deserializable>(
1247        stream: &mut TcpStream,
1248        cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1249    ) -> Result<T, InvocationError> {
1250        let raw = Self::recv_abridged(stream, cipher).await?;
1251        // A 4-byte negative payload is a transport error code from the server.
1252        // Surface it directly rather than masking it with "plain frame too short".
1253        if raw.len() == 4 {
1254            let code = i32::from_le_bytes(raw[..4].try_into().unwrap());
1255            if code < 0 {
1256                return Err(InvocationError::Deserialize(format!(
1257                    "server transport error during DH: code {code}"
1258                )));
1259            }
1260        }
1261        if raw.len() < 20 {
1262            return Err(InvocationError::Deserialize("plain frame too short".into()));
1263        }
1264        if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
1265            return Err(InvocationError::Deserialize(
1266                "expected auth_key_id=0 in plaintext".into(),
1267            ));
1268        }
1269        let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
1270        if raw.len() < 20 + body_len {
1271            return Err(InvocationError::Deserialize(format!(
1272                "plain frame truncated: have {} bytes, need {}",
1273                raw.len(),
1274                20 + body_len
1275            )));
1276        }
1277        let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
1278        T::deserialize(&mut cur).map_err(Into::into)
1279    }
1280}
1281
1282/// Check a decrypted PFS bind response body for boolTrue.
1283/// Decode one bare MTProto message body for the auth.bindTempAuthKey response (pool path).
1284fn pfs_pool_decode_bind_single(body: &[u8]) -> Result<(), String> {
1285    const RPC_RESULT: u32 = 0xf35c6d01;
1286    const BOOL_TRUE: u32 = 0x9972_75b5;
1287    const BOOL_FALSE: u32 = 0xbc79_9737;
1288    const RPC_ERROR: u32 = 0x2144_ca19;
1289    const BAD_MSG: u32 = 0xa7ef_f811;
1290    const BAD_SALT: u32 = 0xedab_447b;
1291    const NEW_SESSION: u32 = 0x9ec2_0908;
1292    const FUTURE_SALTS: u32 = 0xae50_0895;
1293    const MSGS_ACK: u32 = 0x62d6_b459; // msgs_ack#62d6b459
1294    const PONG: u32 = 0x0347_73c5;
1295
1296    if body.len() < 4 {
1297        return Err("skip".into());
1298    }
1299    let ctor = u32::from_le_bytes(body[..4].try_into().unwrap());
1300
1301    match ctor {
1302        BOOL_TRUE => Ok(()),
1303        BOOL_FALSE => Err("server returned boolFalse (binding rejected)".into()),
1304        NEW_SESSION | FUTURE_SALTS | MSGS_ACK | PONG => Err("skip".into()),
1305
1306        RPC_RESULT if body.len() >= 16 => {
1307            let inner = u32::from_le_bytes(body[12..16].try_into().unwrap());
1308            match inner {
1309                BOOL_TRUE => Ok(()),
1310                BOOL_FALSE => Err("rpc_result{boolFalse} (server rejected binding)".into()),
1311                RPC_ERROR if body.len() >= 20 => {
1312                    let code = i32::from_le_bytes(body[16..20].try_into().unwrap());
1313                    let msg = tl_read_string(body.get(20..).unwrap_or(&[])).unwrap_or_default();
1314                    Err(format!("rpc_error code={code} message={msg:?}"))
1315                }
1316                _ => Err(format!("rpc_result inner ctor={inner:#010x}")),
1317            }
1318        }
1319
1320        BAD_MSG if body.len() >= 16 => {
1321            let code = u32::from_le_bytes(body[12..16].try_into().unwrap());
1322            let desc = match code {
1323                16 => "msg_id too low (clock skew)",
1324                17 => "msg_id too high (clock skew)",
1325                18 => "incorrect lower 2 bits of msg_id",
1326                19 => "duplicate msg_id",
1327                20 => "message too old (>300s)",
1328                32 => "msg_seqno too low",
1329                33 => "msg_seqno too high",
1330                48 => "incorrect server salt",
1331                _ => "unknown code",
1332            };
1333            Err(format!("bad_msg_notification code={code} ({desc})"))
1334        }
1335
1336        BAD_SALT if body.len() >= 24 => {
1337            let new_salt = i64::from_le_bytes(body[16..24].try_into().unwrap());
1338            Err(format!(
1339                "bad_server_salt, server wants salt={new_salt:#018x}"
1340            ))
1341        }
1342
1343        _ => Err(format!("unknown ctor={ctor:#010x}")),
1344    }
1345}
1346
1347/// Decode the server response to auth.bindTempAuthKey (pool path).
1348///
1349/// Handles bare messages AND msg_container (the server frequently bundles
1350/// new_session_created + rpc_result together in a container).
1351fn pfs_pool_decode_bind_response(body: &[u8]) -> Result<(), String> {
1352    const MSG_CONTAINER: u32 = 0x73f1f8dc;
1353
1354    if body.len() < 4 {
1355        return Err(format!("response body too short ({} bytes)", body.len()));
1356    }
1357    let ctor = u32::from_le_bytes(body[..4].try_into().unwrap());
1358
1359    if ctor != MSG_CONTAINER {
1360        return pfs_pool_decode_bind_single(body).map_err(|e| {
1361            if e == "skip" {
1362                "__need_more__".into()
1363            } else {
1364                e
1365            }
1366        });
1367    }
1368
1369    if body.len() < 8 {
1370        return Err("msg_container too short to read count".into());
1371    }
1372    let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
1373    let mut pos = 8usize;
1374    let mut last_real_err: Option<String> = None;
1375
1376    for i in 0..count {
1377        if pos + 16 > body.len() {
1378            return Err(format!(
1379                "msg_container truncated at message {i}/{count} (pos={pos} body_len={})",
1380                body.len()
1381            ));
1382        }
1383        let msg_bytes = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
1384        pos += 16;
1385
1386        if pos + msg_bytes > body.len() {
1387            return Err(format!(
1388                "msg_container message {i} body overflows (need {msg_bytes}, have {})",
1389                body.len() - pos
1390            ));
1391        }
1392        let msg_body = &body[pos..pos + msg_bytes];
1393        pos += msg_bytes;
1394
1395        match pfs_pool_decode_bind_single(msg_body) {
1396            Ok(()) => return Ok(()),
1397            Err(e) if e == "skip" => continue,
1398            Err(e) => {
1399                last_real_err = Some(e);
1400            }
1401        }
1402    }
1403
1404    Err(last_real_err.unwrap_or_else(|| "__need_more__".into()))
1405}
1406
1407fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
1408    if data.is_empty() {
1409        return Some(vec![]);
1410    }
1411    let (len, start) = if data[0] < 254 {
1412        (data[0] as usize, 1)
1413    } else if data.len() >= 4 {
1414        (
1415            data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16,
1416            4,
1417        )
1418    } else {
1419        return None;
1420    };
1421    if data.len() < start + len {
1422        return None;
1423    }
1424    Some(data[start..start + len].to_vec())
1425}
1426
1427fn tl_read_string(data: &[u8]) -> Option<String> {
1428    tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
1429}