Skip to main content

ferogram_mtsender/
sender.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2//
3// ferogram: async Telegram MTProto client in Rust
4// https://github.com/ankit-chaubey/ferogram
5//
6// Licensed under either the MIT License or the Apache License 2.0.
7// See the LICENSE-MIT or LICENSE-APACHE file in this repository:
8// https://github.com/ankit-chaubey/ferogram
9//
10// Feel free to use, modify, and share this code.
11// Please keep this notice when redistributing.
12
13use ferogram_mtproto::{
14    EncryptedSession, SeenMsgIds, Session, authentication as auth, new_seen_msg_ids, step2_temp,
15};
16use ferogram_tl_types as tl;
17use ferogram_tl_types::{Cursor, Deserializable, RemoteCall};
18use tokio::io::{AsyncReadExt, AsyncWriteExt};
19use tokio::net::TcpStream;
20
21use crate::errors::InvocationError;
22use crate::pool::{build_msgs_ack_body, build_msgs_ack_ping_body};
23use ferogram_connect::TransportKind;
24// metrics and tracing
25#[allow(unused_imports)]
26use metrics::{counter, histogram};
27
28/// A single encrypted connection to one Telegram DC.
29/// Un-acked server msg_ids to accumulate before eagerly flushing a `msgs_ack` frame.
30const PENDING_ACKS_THRESHOLD: usize = 10;
31
32/// `PingDelayDisconnect` interval for worker connections (in GetFile chunks).
33/// Keeps the socket alive within Telegram's 75-second idle-disconnect window.
34const PING_EVERY_N_CHUNKS: u32 = 5;
35
36pub struct DcConnection {
37    stream: TcpStream,
38    enc: EncryptedSession,
39    pending_acks: Vec<i64>,
40    call_count: u32,
41    /// AES-256-CTR cipher for obfuscated transport; None for plain transports.
42    cipher: Option<ferogram_crypto::ObfuscatedCipher>,
43    /// Persistent dedup ring that outlives individual EncryptedSessions.
44    #[allow(dead_code)]
45    seen_msg_ids: SeenMsgIds,
46}
47
48impl DcConnection {
49    /// Race Obfuscated / Abridged / Http transports and return the first to succeed.
50    #[tracing::instrument(skip(socks5), fields(addr = %addr, dc_id = dc_id))]
51    pub async fn connect_fastest(
52        addr: &str,
53        socks5: Option<&ferogram_connect::Socks5Config>,
54        dc_id: i16,
55    ) -> Result<(Self, &'static str), InvocationError> {
56        use tokio::task::JoinSet;
57        let addr = addr.to_owned();
58        let socks5 = socks5.cloned();
59        tracing::debug!("[dc_pool] probing {addr} with 3 transports");
60        let mut set: JoinSet<Result<(DcConnection, &'static str), InvocationError>> =
61            JoinSet::new();
62
63        {
64            let a = addr.clone();
65            let s = socks5.clone();
66            set.spawn(async move {
67                Ok((
68                    DcConnection::connect_raw(
69                        &a,
70                        s.as_ref(),
71                        &TransportKind::Obfuscated { secret: None },
72                        dc_id,
73                    )
74                    .await?,
75                    "Obfuscated",
76                ))
77            });
78        }
79        {
80            let a = addr.clone();
81            let s = socks5.clone();
82            set.spawn(async move {
83                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
84                Ok((
85                    DcConnection::connect_raw(&a, s.as_ref(), &TransportKind::Abridged, dc_id)
86                        .await?,
87                    "Abridged",
88                ))
89            });
90        }
91        {
92            let a = addr.clone();
93            set.spawn(async move {
94                tokio::time::sleep(std::time::Duration::from_millis(800)).await;
95                Ok((
96                    DcConnection::connect_raw(&a, None, &TransportKind::Http, dc_id).await?,
97                    "Http",
98                ))
99            });
100        }
101
102        let mut last_err = InvocationError::Deserialize("connect_fastest: no candidates".into());
103        while let Some(outcome) = set.join_next().await {
104            match outcome {
105                Ok(Ok((conn, label))) => {
106                    set.abort_all();
107                    return Ok((conn, label));
108                }
109                Ok(Err(e)) => {
110                    last_err = e;
111                }
112                Err(e) if e.is_cancelled() => {}
113                Err(_) => {}
114            }
115        }
116        Err(last_err)
117    }
118
119    /// Connect and perform full DH handshake.
120    #[tracing::instrument(skip(socks5, transport), fields(addr = %addr, dc_id = dc_id))]
121    pub async fn connect_raw(
122        addr: &str,
123        socks5: Option<&ferogram_connect::Socks5Config>,
124        transport: &TransportKind,
125        dc_id: i16,
126    ) -> Result<Self, InvocationError> {
127        tracing::debug!("[dc_pool] Connecting to {addr} …");
128        let mut stream = Self::open_tcp(addr, socks5).await?;
129        let mut cipher = Self::send_transport_init(&mut stream, transport, dc_id).await?;
130
131        let mut plain = Session::new();
132
133        let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
134        Self::send_plain_frame(
135            &mut stream,
136            &plain.pack(&req1).to_plaintext_bytes(),
137            cipher.as_mut(),
138        )
139        .await?;
140        let res_pq: tl::enums::ResPq = Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
141
142        let (req2, s2) = auth::step2(s1, res_pq, dc_id as i32)
143            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
144        Self::send_plain_frame(
145            &mut stream,
146            &plain.pack(&req2).to_plaintext_bytes(),
147            cipher.as_mut(),
148        )
149        .await?;
150        let dh: tl::enums::ServerDhParams =
151            Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
152
153        let (req3, s3) =
154            auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
155        Self::send_plain_frame(
156            &mut stream,
157            &plain.pack(&req3).to_plaintext_bytes(),
158            cipher.as_mut(),
159        )
160        .await?;
161        let ans: tl::enums::SetClientDhParamsAnswer =
162            Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
163
164        // Retry loop for dh_gen_retry (up to 5 attempts).
165        let done = {
166            let mut result =
167                auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
168            let mut attempts = 0u8;
169            loop {
170                match result {
171                    auth::FinishResult::Done(d) => break d,
172                    auth::FinishResult::Retry {
173                        retry_id,
174                        dh_params,
175                        nonce,
176                        server_nonce,
177                        new_nonce,
178                    } => {
179                        attempts += 1;
180                        if attempts >= 5 {
181                            return Err(InvocationError::Deserialize(
182                                "dh_gen_retry exceeded 5 attempts".into(),
183                            ));
184                        }
185                        let (req_retry, s3_retry) =
186                            auth::retry_step3(&dh_params, nonce, server_nonce, new_nonce, retry_id)
187                                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
188                        Self::send_plain_frame(
189                            &mut stream,
190                            &plain.pack(&req_retry).to_plaintext_bytes(),
191                            cipher.as_mut(),
192                        )
193                        .await?;
194                        let ans_retry: tl::enums::SetClientDhParamsAnswer =
195                            Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
196                        result = auth::finish(s3_retry, ans_retry)
197                            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
198                    }
199                }
200            }
201        };
202        tracing::debug!("[dc_pool] DH complete ✓ for {addr}");
203
204        let seen = new_seen_msg_ids();
205        Ok(Self {
206            stream,
207            cipher,
208            enc: EncryptedSession::with_seen(
209                done.auth_key,
210                done.first_salt,
211                done.time_offset,
212                seen.clone(),
213            ),
214            pending_acks: Vec::new(),
215            call_count: 0,
216            seen_msg_ids: seen,
217        })
218    }
219
220    /// Connect with an already-known auth key (no DH needed).
221    /// If `pfs` is true, performs a temp-key DH bind before any RPCs.
222    #[allow(clippy::too_many_arguments)]
223    pub async fn connect_with_key(
224        addr: &str,
225        auth_key: [u8; 256],
226        first_salt: i64,
227        time_offset: i32,
228        socks5: Option<&ferogram_connect::Socks5Config>,
229        mtproxy: Option<&ferogram_connect::MtProxyConfig>,
230        transport: &TransportKind,
231        dc_id: i16,
232        pfs: bool,
233    ) -> Result<Self, InvocationError> {
234        let (mut stream, mut cipher) = if let Some(mp) = mtproxy {
235            let mut s = mp.connect().await?;
236            s.set_nodelay(true)?;
237            let c = Self::send_transport_init(&mut s, &mp.transport, dc_id).await?;
238            (s, c)
239        } else {
240            let mut s = Self::open_tcp(addr, socks5).await?;
241            let c = Self::send_transport_init(&mut s, transport, dc_id).await?;
242            (s, c)
243        };
244
245        if pfs {
246            tracing::debug!("[dc_pool] PFS: temp DH bind for DC{dc_id}");
247            match Self::do_pool_pfs_bind(&mut stream, cipher.as_mut(), &auth_key, dc_id).await {
248                Ok(temp_enc) => {
249                    tracing::info!("[dc_pool] PFS bind complete DC{dc_id}");
250                    return Ok(Self {
251                        stream,
252                        cipher,
253                        enc: temp_enc,
254                        pending_acks: Vec::new(),
255                        call_count: 0,
256                        seen_msg_ids: new_seen_msg_ids(),
257                    });
258                }
259                Err(e) => {
260                    tracing::warn!("[dc_pool] PFS bind failed DC{dc_id} ({e}); falling back");
261                    return Err(e);
262                }
263            }
264        }
265
266        let seen = new_seen_msg_ids();
267        Ok(Self {
268            stream,
269            cipher,
270            enc: EncryptedSession::with_seen(auth_key, first_salt, time_offset, seen.clone()),
271            pending_acks: Vec::new(),
272            call_count: 0,
273            seen_msg_ids: seen,
274        })
275    }
276
277    /// Temp-key DH handshake + auth.bindTempAuthKey on an existing stream.
278    #[allow(clippy::needless_option_as_deref)]
279    async fn do_pool_pfs_bind(
280        stream: &mut tokio::net::TcpStream,
281        mut cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
282        perm_auth_key: &[u8; 256],
283        dc_id: i16,
284    ) -> Result<EncryptedSession, InvocationError> {
285        use ferogram_mtproto::{
286            auth_key_id_from_key, encrypt_bind_inner, gen_msg_id, new_seen_msg_ids,
287            serialize_bind_temp_auth_key,
288        };
289        const TEMP_EXPIRES: i32 = 86_400; // 24 h
290
291        // temp-key DH
292        let mut plain = ferogram_mtproto::Session::new();
293
294        let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
295        Self::send_plain_frame(
296            stream,
297            &plain.pack(&req1).to_plaintext_bytes(),
298            cipher.as_deref_mut(),
299        )
300        .await?;
301        let res_pq: tl::enums::ResPq =
302            Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
303
304        let (req2, s2) = step2_temp(s1, res_pq, dc_id as i32, TEMP_EXPIRES)
305            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
306        Self::send_plain_frame(
307            stream,
308            &plain.pack(&req2).to_plaintext_bytes(),
309            cipher.as_deref_mut(),
310        )
311        .await?;
312        let dh: tl::enums::ServerDhParams =
313            Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
314
315        let (req3, s3) =
316            auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
317        Self::send_plain_frame(
318            stream,
319            &plain.pack(&req3).to_plaintext_bytes(),
320            cipher.as_deref_mut(),
321        )
322        .await?;
323        let ans: tl::enums::SetClientDhParamsAnswer =
324            Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
325
326        let done = {
327            let mut result =
328                auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
329            let mut attempts = 0u8;
330            loop {
331                match result {
332                    ferogram_mtproto::FinishResult::Done(d) => break d,
333                    ferogram_mtproto::FinishResult::Retry {
334                        retry_id,
335                        dh_params,
336                        nonce,
337                        server_nonce,
338                        new_nonce,
339                    } => {
340                        attempts += 1;
341                        if attempts >= 5 {
342                            return Err(InvocationError::Deserialize(
343                                "PFS pool temp DH retry exceeded 5".into(),
344                            ));
345                        }
346                        let (rr, s3r) = ferogram_mtproto::retry_step3(
347                            &dh_params,
348                            nonce,
349                            server_nonce,
350                            new_nonce,
351                            retry_id,
352                        )
353                        .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
354                        Self::send_plain_frame(
355                            stream,
356                            &plain.pack(&rr).to_plaintext_bytes(),
357                            cipher.as_deref_mut(),
358                        )
359                        .await?;
360                        let ar: tl::enums::SetClientDhParamsAnswer =
361                            Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
362                        result = auth::finish(s3r, ar)
363                            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
364                    }
365                }
366            }
367        };
368
369        let temp_key = done.auth_key;
370        let temp_salt = done.first_salt;
371        let temp_offset = done.time_offset;
372
373        // build bindTempAuthKey body
374        let temp_key_id = auth_key_id_from_key(&temp_key);
375        let perm_key_id = auth_key_id_from_key(perm_auth_key);
376
377        let mut nonce_buf = [0u8; 8];
378        getrandom::getrandom(&mut nonce_buf)
379            .map_err(|_| InvocationError::Deserialize("getrandom nonce".into()))?;
380        let nonce = i64::from_le_bytes(nonce_buf);
381
382        let server_now = std::time::SystemTime::now()
383            .duration_since(std::time::UNIX_EPOCH)
384            .expect("system clock is before UNIX epoch")
385            .as_secs() as i32
386            + temp_offset;
387        let expires_at = server_now + TEMP_EXPIRES;
388
389        let seen = new_seen_msg_ids();
390        let mut temp_enc = EncryptedSession::with_seen(temp_key, temp_salt, temp_offset, seen);
391        let temp_session_id = temp_enc.session_id();
392
393        let msg_id = gen_msg_id();
394        let enc_msg = encrypt_bind_inner(
395            perm_auth_key,
396            msg_id,
397            nonce,
398            temp_key_id,
399            perm_key_id,
400            temp_session_id,
401            expires_at,
402        );
403        let bind_body = serialize_bind_temp_auth_key(perm_key_id, nonce, expires_at, &enc_msg);
404
405        // send encrypted bind request
406        let wire = temp_enc.pack_body_at_msg_id(&bind_body, msg_id);
407        Self::send_abridged(stream, &wire, cipher.as_deref_mut()).await?;
408
409        // Receive and verify response.
410        // The server may send informational frames first (msgs_ack, new_session_created)
411        // before the actual rpc_result{boolTrue}, so we loop up to 5 frames.
412        for attempt in 0u8..5 {
413            let mut raw = Self::recv_abridged(stream, cipher.as_deref_mut()).await?;
414            let decrypted = temp_enc.unpack(&mut raw).map_err(|e| {
415                InvocationError::Deserialize(format!("PFS pool bind decrypt: {e:?}"))
416            })?;
417            match pfs_pool_decode_bind_response(&decrypted.body) {
418                Ok(()) => {
419                    // bindTempAuthKey succeeds under the temp key; keep the session
420                    // sequence as-is so subsequent RPCs continue from the same MTProto
421                    // message stream.
422                    return Ok(temp_enc);
423                }
424                Err(ref e) if e == "__need_more__" => {
425                    tracing::debug!(
426                        "[ferogram] PFS pool bind (DC{dc_id}): informational frame {attempt}, reading next"
427                    );
428                    continue;
429                }
430                Err(reason) => {
431                    tracing::error!(
432                        "[ferogram] PFS pool bind server response (DC{dc_id}): {reason}"
433                    );
434                    return Err(InvocationError::Deserialize(format!(
435                        "auth.bindTempAuthKey (pool): {reason}"
436                    )));
437                }
438            }
439        }
440        Err(InvocationError::Deserialize(
441            "auth.bindTempAuthKey (pool): no boolTrue after 5 frames".into(),
442        ))
443    }
444
445    async fn open_tcp(
446        addr: &str,
447        socks5: Option<&ferogram_connect::Socks5Config>,
448    ) -> Result<TcpStream, InvocationError> {
449        let stream = match socks5 {
450            Some(proxy) => proxy.connect(addr).await?,
451            None => TcpStream::connect(addr).await?,
452        };
453        // Disable Nagle for immediate single-frame delivery.
454        stream.set_nodelay(true)?;
455        // SO_KEEPALIVE keeps worker connections alive across idle periods.
456        {
457            let sock = socket2::SockRef::from(&stream);
458            let ka = socket2::TcpKeepalive::new()
459                .with_time(std::time::Duration::from_secs(10))
460                .with_interval(std::time::Duration::from_secs(5));
461            #[cfg(not(target_os = "windows"))]
462            let ka = ka.with_retries(3);
463            sock.set_tcp_keepalive(&ka).ok();
464        }
465        Ok(stream)
466    }
467
468    async fn send_transport_init(
469        stream: &mut TcpStream,
470        transport: &TransportKind,
471        dc_id: i16,
472    ) -> Result<Option<ferogram_crypto::ObfuscatedCipher>, InvocationError> {
473        match transport {
474            TransportKind::Abridged => {
475                stream.write_all(&[0xef]).await?;
476            }
477            TransportKind::Intermediate => {
478                stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
479            }
480            TransportKind::Full => {}
481            TransportKind::Obfuscated { secret } => {
482                use sha2::Digest;
483                let mut nonce = [0u8; 64];
484                loop {
485                    getrandom::getrandom(&mut nonce)
486                        .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
487                    let first =
488                        u32::from_le_bytes(nonce[0..4].try_into().expect("nonce is [u8;64]"));
489                    let second =
490                        u32::from_le_bytes(nonce[4..8].try_into().expect("nonce is [u8;64]"));
491                    let bad = nonce[0] == 0xEF
492                        || first == 0x44414548
493                        || first == 0x54534F50
494                        || first == 0x20544547
495                        || first == 0x4954504f  // OPTIONS
496                        || first == 0xEEEEEEEE
497                        || first == 0xDDDDDDDD
498                        || first == 0x02010316
499                        || second == 0x00000000;
500                    if !bad {
501                        break;
502                    }
503                }
504                let tx_raw: [u8; 32] = nonce[8..40].try_into().expect("nonce is [u8;64]");
505                let tx_iv: [u8; 16] = nonce[40..56].try_into().expect("nonce is [u8;64]");
506                let mut rev48 = nonce[8..56].to_vec();
507                rev48.reverse();
508                let rx_raw: [u8; 32] = rev48[0..32]
509                    .try_into()
510                    .expect("rev48 is nonce[8..56].reversed(), len=48");
511                let rx_iv: [u8; 16] = rev48[32..48]
512                    .try_into()
513                    .expect("rev48 is nonce[8..56].reversed(), len=48");
514                let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
515                    let mut h = sha2::Sha256::new();
516                    h.update(tx_raw);
517                    h.update(s.as_ref());
518                    let tx: [u8; 32] = h.finalize().into();
519                    let mut h = sha2::Sha256::new();
520                    h.update(rx_raw);
521                    h.update(s.as_ref());
522                    let rx: [u8; 32] = h.finalize().into();
523                    (tx, rx)
524                } else {
525                    (tx_raw, rx_raw)
526                };
527                nonce[56] = 0xef;
528                nonce[57] = 0xef;
529                nonce[58] = 0xef;
530                nonce[59] = 0xef;
531                let dc_bytes = dc_id.to_le_bytes();
532                nonce[60] = dc_bytes[0];
533                nonce[61] = dc_bytes[1];
534                let mut enc =
535                    ferogram_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
536                let mut skip = [0u8; 56];
537                enc.encrypt(&mut skip);
538                enc.encrypt(&mut nonce[56..64]);
539                stream.write_all(&nonce).await?;
540                return Ok(Some(enc));
541            }
542            TransportKind::PaddedIntermediate { secret } => {
543                use sha2::Digest;
544                let mut nonce = [0u8; 64];
545                loop {
546                    getrandom::getrandom(&mut nonce)
547                        .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
548                    let first =
549                        u32::from_le_bytes(nonce[0..4].try_into().expect("nonce is [u8;64]"));
550                    let second =
551                        u32::from_le_bytes(nonce[4..8].try_into().expect("nonce is [u8;64]"));
552                    let bad = nonce[0] == 0xEF
553                        || first == 0x44414548
554                        || first == 0x54534F50
555                        || first == 0x20544547
556                        || first == 0x4954504f
557                        || first == 0xEEEEEEEE
558                        || first == 0xDDDDDDDD
559                        || first == 0x02010316
560                        || second == 0x00000000;
561                    if !bad {
562                        break;
563                    }
564                }
565                let tx_raw: [u8; 32] = nonce[8..40].try_into().expect("nonce is [u8;64]");
566                let tx_iv: [u8; 16] = nonce[40..56].try_into().expect("nonce is [u8;64]");
567                let mut rev48 = nonce[8..56].to_vec();
568                rev48.reverse();
569                let rx_raw: [u8; 32] = rev48[0..32]
570                    .try_into()
571                    .expect("rev48 is nonce[8..56].reversed(), len=48");
572                let rx_iv: [u8; 16] = rev48[32..48]
573                    .try_into()
574                    .expect("rev48 is nonce[8..56].reversed(), len=48");
575                let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
576                    let mut h = sha2::Sha256::new();
577                    h.update(tx_raw);
578                    h.update(s.as_ref());
579                    let tx: [u8; 32] = h.finalize().into();
580                    let mut h = sha2::Sha256::new();
581                    h.update(rx_raw);
582                    h.update(s.as_ref());
583                    let rx: [u8; 32] = h.finalize().into();
584                    (tx, rx)
585                } else {
586                    (tx_raw, rx_raw)
587                };
588                nonce[56] = 0xdd;
589                nonce[57] = 0xdd;
590                nonce[58] = 0xdd;
591                nonce[59] = 0xdd;
592                let dc_bytes = dc_id.to_le_bytes();
593                nonce[60] = dc_bytes[0];
594                nonce[61] = dc_bytes[1];
595                let mut enc =
596                    ferogram_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
597                let mut skip = [0u8; 56];
598                enc.encrypt(&mut skip);
599                enc.encrypt(&mut nonce[56..64]);
600                stream.write_all(&nonce).await?;
601                return Ok(Some(enc));
602            }
603            TransportKind::FakeTls { .. } => {
604                // FakeTls requires a full TLS 1.2 ClientHello handshake which is not yet
605                // implemented in DcPool worker connections. Use Obfuscated or
606                // PaddedIntermediate for proxy connections instead.
607                return Err(InvocationError::Deserialize(
608                    "FakeTls transport is not supported for DcPool connections".into(),
609                ));
610            }
611            TransportKind::Http => {}
612        }
613        Ok(None)
614    }
615
616    pub fn auth_key_bytes(&self) -> [u8; 256] {
617        self.enc.auth_key_bytes()
618    }
619    pub fn first_salt(&self) -> i64 {
620        self.enc.salt
621    }
622    pub fn time_offset(&self) -> i32 {
623        self.enc.time_offset
624    }
625
626    #[tracing::instrument(skip(self, req), fields(method = std::any::type_name::<R>()))]
627    pub async fn rpc_call<R: RemoteCall>(&mut self, req: &R) -> Result<Vec<u8>, InvocationError> {
628        let _t0 = std::time::Instant::now();
629        // Periodic PingDelayDisconnect: sent before the request to piggyback on
630        // the same TCP write window.  Keeps the socket alive across the download.
631        self.call_count += 1;
632        if self.call_count.is_multiple_of(PING_EVERY_N_CHUNKS) {
633            let ping_id = self.call_count as i64;
634            let ping_body = build_msgs_ack_ping_body(ping_id);
635            // PingDelayDisconnect is content-related (returns Pong): must use odd seq_no.
636            let (ping_wire, _) = self.enc.pack_body_with_msg_id(&ping_body, true);
637            // This ping is fire-and-forget. The Pong response is a content-related
638            // server message and must be acknowledged. If the RPC result arrives before
639            // the Pong, the Pong's msg_id is never added to pending_acks. On idle
640            // connections (no subsequent RPCs) the un-acked Pong will eventually cause
641            // Telegram to close the connection. A dedicated always-running reader task
642            // that drains and acks all server messages would fix this permanently; for
643            // now the next rpc_call iteration receives and acks the Pong via pending_acks.
644            let _ = Self::send_abridged(&mut self.stream, &ping_wire, self.cipher.as_mut()).await;
645        }
646
647        // Flush pending acks.
648        if !self.pending_acks.is_empty() {
649            let ack_body = build_msgs_ack_body(&self.pending_acks);
650            let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
651            let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
652            self.pending_acks.clear();
653        }
654
655        // Track sent msg_id to verify rpc_result.req_msg_id and discard stale responses.
656        let (wire, mut sent_msg_id) = self.enc.pack_with_msg_id(req);
657        Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
658        let mut salt_retries = 0u8;
659        let mut session_resets = 0u8;
660        loop {
661            let mut raw = Self::recv_abridged(&mut self.stream, self.cipher.as_mut()).await?;
662            let msg = self
663                .enc
664                .unpack(&mut raw)
665                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
666            // Track every received msg_id for acknowledgement.
667            self.pending_acks.push(msg.msg_id);
668            if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
669                // Eager flush: too many un-acked messages  - Telegram will close the
670                // connection if we don't ack within its window.
671                let ack_body = build_msgs_ack_body(&self.pending_acks);
672                let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
673                let _ =
674                    Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
675                self.pending_acks.clear();
676            }
677            // Salt is updated only on explicit bad_server_salt, not on every message.
678            if msg.body.len() < 4 {
679                return Ok(msg.body);
680            }
681            let mut need_resend = false;
682            let mut need_session_reset = false;
683            let mut bad_msg_code: Option<u32> = None;
684            let mut bad_msg_server_id: Option<i64> = None;
685            // Process all flags before returning: containers may carry
686            // new_session_created + rpc_result together.
687            let scan_result = Self::scan_body(
688                &msg.body,
689                &mut self.enc.salt,
690                &mut need_resend,
691                &mut need_session_reset,
692                &mut bad_msg_code,
693                &mut bad_msg_server_id,
694                Some(sent_msg_id),
695                msg.msg_id,
696            )?;
697            // new_session_created requires seq_no reset to 0.
698            if need_session_reset {
699                session_resets += 1;
700                if session_resets > 2 {
701                    return Err(InvocationError::Deserialize(
702                        "new_session_created: exceeded 2 resets".into(),
703                    ));
704                }
705                if !self.pending_acks.is_empty() {
706                    let ack_body = build_msgs_ack_body(&self.pending_acks);
707                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
708                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
709                        .await;
710                    self.pending_acks.clear();
711                }
712                // Keep the current session sequence. new_session_created updates the
713                // server salt and may require resending stale requests, but it does
714                // not require zeroing the local MTProto seq counter.
715                if scan_result.is_none() {
716                    // No result yet; resend using the current MTProto sequence.
717                    tracing::debug!(
718                        "[dc_pool] new_session_created: resending [{session_resets}/2]"
719                    );
720                    let (wire, new_id) = self.enc.pack_with_msg_id(req);
721                    sent_msg_id = new_id;
722                    Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
723                }
724                // If scan_result.is_some(), the result arrived in the same container
725                // as new_session_created; session has been reset for future calls,
726                // fall through to return the result.
727            } else if need_resend {
728                // Apply seq_no / time corrections from bad_msg_notification.
729                match bad_msg_code {
730                    Some(16) | Some(17) => {
731                        if let Some(srv_id) = bad_msg_server_id {
732                            self.enc.correct_time_offset(srv_id);
733                        }
734                        // Do not call undo_seq_no here. Reusing the same seq_no on a
735                        // retry violates MTProto monotonicity; the server may reject
736                        // with code 32. Let the next pack_with_msg_id assign the next
737                        // available odd seq_no for the resent message.
738                    }
739                    Some(32) | Some(33) => {
740                        // correct_seq_no does a full session reset (new session_id,
741                        // seq_no=0) instead of magic +/- offsets.
742                        self.enc
743                            .correct_seq_no(bad_msg_code.expect("matched Some arm"));
744                    }
745                    _ => {
746                        // bad_server_salt or bad_msg code 48
747                        self.enc.undo_seq_no();
748                    }
749                }
750                salt_retries += 1;
751                if salt_retries >= 5 {
752                    return Err(InvocationError::Deserialize(
753                        "bad_server_salt/bad_msg: exceeded 5 retries".into(),
754                    ));
755                }
756                tracing::debug!(
757                    "[dc_pool] resend in transfer conn (code={bad_msg_code:?}) [{salt_retries}/5]"
758                );
759                if !self.pending_acks.is_empty() {
760                    let ack_body = build_msgs_ack_body(&self.pending_acks);
761                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
762                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
763                        .await;
764                    self.pending_acks.clear();
765                }
766                let (wire, new_id) = self.enc.pack_with_msg_id(req);
767                sent_msg_id = new_id;
768                Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
769            }
770            if let Some(result) = scan_result {
771                metrics::counter!("ferogram.rpc_calls_total", "result" => "ok").increment(1);
772                metrics::histogram!("ferogram.rpc_latency_ms")
773                    .record(_t0.elapsed().as_millis() as f64);
774                return Ok(result);
775            }
776        }
777    }
778    ///
779    /// Returns `Ok(Some(bytes))` when rpc_result is found.
780    /// Returns `Ok(None)` for informational messages (continue reading).
781    /// Returns `Err` for rpc_error or parse failures.
782    ///
783    /// Output flags:
784    /// - `need_resend`: set for bad_server_salt / bad_msg_notification (codes 16/17/32/33/48)
785    /// - `need_session_reset`: set for new_session_created (seq_no must reset to 0)
786    /// - `bad_msg_code`: error_code from bad_msg_notification for caller to apply correction
787    /// - `bad_msg_server_id`: server msg_id for time-offset correction (codes 16/17)
788    /// - `server_msg_id`: outer frame msg_id for time-offset correction (codes 16/17).
789    ///   Must be msg.msg_id from the caller, not bad_msg_id (client clock, not server's).
790    #[allow(clippy::too_many_arguments)]
791    fn scan_body(
792        body: &[u8],
793        salt: &mut i64,
794        need_resend: &mut bool,
795        need_session_reset: &mut bool,
796        bad_msg_code: &mut Option<u32>,
797        bad_msg_server_id: &mut Option<i64>,
798        sent_msg_id: Option<i64>,
799        server_msg_id: i64,
800    ) -> Result<Option<Vec<u8>>, InvocationError> {
801        if body.len() < 4 {
802            return Ok(None);
803        }
804        let cid = u32::from_le_bytes(body[..4].try_into().expect("body.len() >= 4 checked above"));
805        match cid {
806            0xf35c6d01 /* rpc_result: CID(4) + req_msg_id(8) + result */ => {
807                if body.len() >= 12
808                    && let Some(expected) = sent_msg_id {
809                        let resp_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 12 checked above"));
810                        if resp_id != expected {
811                            tracing::debug!(
812                                "[dc_pool] rpc_result req_msg_id mismatch \
813                                 (got {resp_id:#018x}, want {expected:#018x}); skipping"
814                            );
815                            return Ok(None);
816                        }
817                    }
818                let inner = if body.len() >= 12 { &body[12..] } else { body };
819                // Inner body may itself be gzip_packed (e.g. help.Config inside rpc_result).
820                if inner.len() >= 4
821                    && u32::from_le_bytes(inner[..4].try_into().expect("inner.len() >= 4 checked above")) == 0x3072cfa1
822                {
823                    let mut dummy_salt = *salt;
824                    let mut nr = false; let mut nsr = false;
825                    let mut bc = None; let mut bsi = None;
826                    if let Some(r) = Self::scan_body(inner, &mut dummy_salt, &mut nr, &mut nsr, &mut bc, &mut bsi, None, server_msg_id)? {
827                        return Ok(Some(r));
828                    }
829                    // Unwrap the gzip directly and return the decompressed bytes.
830                    if let Some(compressed) = tl_read_bytes(&inner[4..]) {
831                        let dec = flate2::read::GzDecoder::new(compressed.as_slice());
832                        let mut limited = std::io::Read::take(dec, 16 * 1024 * 1024);
833                        let mut out = Vec::new();
834                        if std::io::Read::read_to_end(&mut limited, &mut out).is_ok() {
835                            return Ok(Some(out));
836                        }
837                    }
838                    return Ok(None);
839                }
840                if inner.len() >= 8
841                    && u32::from_le_bytes(inner[..4].try_into().expect("inner.len() >= 8 checked above")) == 0x2144ca19
842                {
843                    let code = i32::from_le_bytes(inner[4..8].try_into().expect("inner.len() >= 8 checked above"));
844                    let message = tl_read_string(&inner[8..]).unwrap_or_default();
845                    return Err(InvocationError::Rpc(
846                        crate::errors::RpcError::from_telegram(code, &message),
847                    ));
848                }
849                Ok(Some(inner.to_vec()))
850            }
851            0x2144ca19 /* rpc_error */ => {
852                if body.len() < 8 {
853                    return Err(InvocationError::Deserialize("rpc_error short".into()));
854                }
855                let code = i32::from_le_bytes(body[4..8].try_into().expect("body.len() >= 8 checked above"));
856                let message = tl_read_string(&body[8..]).unwrap_or_default();
857                Err(InvocationError::Rpc(crate::errors::RpcError::from_telegram(code, &message)))
858            }
859            0xedab447b /* bad_server_salt */ => {
860                // bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int error_code:int new_server_salt:long
861                if body.len() >= 28 {
862                    let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 28 checked above"));
863                    let new_salt   = i64::from_le_bytes(body[20..28].try_into().expect("body.len() >= 28 checked above"));
864                    // Only apply new salt when bad_msg_id matches our sent request;
865                    // stale frames from prior requests must not corrupt the current salt.
866                    if sent_msg_id.is_none_or(|id| id == bad_msg_id) {
867                        *salt = new_salt;
868                        *need_resend = true;
869                    }
870                }
871                Ok(None)
872            }
873            0x9ec20908 /* new_session_created */ => {
874                // new_session_created#9ec20908 first_msg_id:long unique_id:long server_salt:long
875                // Signal need_session_reset so the caller resets seq_no before resending.
876                if body.len() >= 28 {
877                    let first_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 28 checked above"));
878                    let unique_id    = i64::from_le_bytes(body[12..20].try_into().expect("body.len() >= 28 checked above"));
879                    let server_salt  = i64::from_le_bytes(body[20..28].try_into().expect("body.len() >= 28 checked above"));
880                    tracing::debug!(
881                        "[dc_pool] new_session_created: unique_id={unique_id:#018x} \
882                         first_msg_id={first_msg_id} salt={server_salt}"
883                    );
884                    *salt = server_salt;
885                    // Only reset if the pending request predates the server's new session.
886                    // If sent_msg_id == first_msg_id (fresh worker conn on first send),
887                    // the server will reply with our current session_id. Unconditionally
888                    // calling reset_session() here changes the id, causing the response
889                    // decrypt to fail with session_id mismatch.
890                    if sent_msg_id.is_some_and(|id| id < first_msg_id) {
891                        *need_session_reset = true;
892                    }
893                }
894                Ok(None)
895            }
896            0xa7eff811 /* bad_msg_notification */ => {
897                // bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int error_code:int
898                //
899                // TL layout: body[4..12]=bad_msg_id, body[12..16]=bad_msg_seqno,
900                // body[16..20]=error_code. Previous code read [12..16] as error_code
901                // (bad_msg_seqno), so error matching always compared the wrong field.
902                if body.len() >= 20 {
903                    let bad_msg_id  = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 20 checked above"));
904                    // body[12..16] = bad_msg_seqno, not used for recovery.
905                    let error_code  = u32::from_le_bytes(body[16..20].try_into().expect("body.len() >= 20 checked above"));
906                    tracing::debug!(
907                        "[dc_pool] bad_msg_notification: bad_msg_id={bad_msg_id:#018x} code={error_code}"
908                    );
909                    match error_code {
910                        16 | 17 => {
911                            // msg_id too low/high: time-offset correction needed.
912                            // server_msg_id upper 32 bits = server Unix timestamp.
913                            // bad_msg_id carries the client's clock, not the server's.
914                            *bad_msg_code = Some(error_code);
915                            *bad_msg_server_id = Some(server_msg_id);
916                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
917                        }
918                        32 | 33 => {
919                            // seq_no wrong.
920                            *bad_msg_code = Some(error_code);
921                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
922                        }
923                        48 => {
924                            // bad_msg code 48 = incorrect server salt. Per spec, this
925                            // arrives together with a bad_server_salt frame in the same
926                            // container that carries the new salt. If bad_server_salt was
927                            // already processed, *salt is updated and the resend uses the
928                            // correct value. If not (partial container), resend once
929                            // conservatively; the retry loop's 5-attempt cap prevents a loop.
930                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
931                            tracing::debug!(
932                                "[dc_pool] bad_msg code 48 (wrong salt): will resend with current salt"
933                            );
934                        }
935                        _ => {
936                            // Unknown code; resend to avoid the loop stalling.
937                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
938                        }
939                    }
940                }
941                Ok(None)
942            }
943            0x347773c5 /* pong */ => {
944                // Pong is returned for both internal PingDelayDisconnect (fire-and-forget)
945                // and user-invoked Ping (which has a pending invoke future waiting).
946                // pong layout: CID(4) + msg_id(8) + ping_id(8)
947                // pong.msg_id is the msg_id of the original ping request.
948                // Route back to the caller when it matches the pending sent_msg_id.
949                if body.len() >= 12
950                    && let Some(expected) = sent_msg_id
951                {
952                    let pong_req_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 12 for pong"));
953                    if pong_req_id == expected {
954                        return Ok(Some(body.to_vec()));
955                    }
956                }
957                // Internal keepalive pong - discard.
958                Ok(None)
959            }
960            0x73f1f8dc /* msg_container */ => {
961                if body.len() < 8 {
962                    return Ok(None);
963                }
964                let count = u32::from_le_bytes(body[4..8].try_into().expect("body.len() >= 8 for msg_container")) as usize;
965                let mut pos = 8usize;
966                // Do not early-return: containers may bundle new_session_created + rpc_result
967                // together; all items must be processed so session/salt flags are observed.
968                let mut found: Option<Vec<u8>> = None;
969                for _ in 0..count {
970                    if pos + 16 > body.len() { break; }
971                    let inner_bytes =
972                        u32::from_le_bytes(body[pos + 12..pos + 16].try_into().expect("pos+16 <= body.len() checked above")) as usize;
973                    pos += 16;
974                    if pos + inner_bytes > body.len() { break; }
975                    let inner = &body[pos..pos + inner_bytes];
976                    pos += inner_bytes;
977                    if found.is_none() {
978                        if let Some(r) = Self::scan_body(inner, salt, need_resend,
979                            need_session_reset, bad_msg_code, bad_msg_server_id, sent_msg_id,
980                            server_msg_id)?
981                        {
982                            found = Some(r);
983                            // Do NOT return  - continue processing remaining items so that
984                            // session/salt flags from co-arriving messages are observed.
985                        }
986                    } else {
987                        // Result already captured; still process remaining items for
988                        // side-effect flags (salt, session reset, bad_msg). Pass
989                        // sent_msg_id so the req_msg_id guard still filters stale
990                        // rpc_results. Passing None would bypass the guard and allow
991                        // a stale response to overwrite `found` on the next iteration.
992                        let _ = Self::scan_body(inner, salt, need_resend, need_session_reset,
993                                                bad_msg_code, bad_msg_server_id, sent_msg_id,
994                                                server_msg_id)?;
995                    }
996                }
997                Ok(found)
998            }
999            0x3072cfa1 /* gzip_packed */ => {
1000                // Decompress and recurse: server wraps large responses in gzip_packed.
1001                if let Some(compressed) = tl_read_bytes(&body[4..]) {
1002                    let decoder = flate2::read::GzDecoder::new(compressed.as_slice());
1003                    let mut limited = std::io::Read::take(decoder, 16 * 1024 * 1024);
1004                    let mut decompressed = Vec::new();
1005                    if std::io::Read::read_to_end(&mut limited, &mut decompressed).is_ok()
1006                        && !decompressed.is_empty()
1007                    {
1008                        return Self::scan_body(
1009                            &decompressed, salt,
1010                            need_resend, need_session_reset,
1011                            bad_msg_code, bad_msg_server_id,
1012                            sent_msg_id,
1013                            server_msg_id,
1014                        );
1015                    }
1016                }
1017                Ok(None)
1018            }
1019            _ => Ok(None),
1020        }
1021    }
1022
1023    /// Like `rpc_call` but accepts any `Serializable` type (not just `RemoteCall`).
1024    pub async fn rpc_call_serializable<S: ferogram_tl_types::Serializable>(
1025        &mut self,
1026        req: &S,
1027    ) -> Result<Vec<u8>, InvocationError> {
1028        if !self.pending_acks.is_empty() {
1029            let ack_body = build_msgs_ack_body(&self.pending_acks);
1030            let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1031            let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
1032            self.pending_acks.clear();
1033        }
1034        let (wire, mut sent_msg_id) = self.enc.pack_serializable_with_msg_id(req);
1035        Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
1036        let mut salt_retries = 0u8;
1037        let mut session_resets = 0u8;
1038        loop {
1039            let mut raw = Self::recv_abridged(&mut self.stream, self.cipher.as_mut()).await?;
1040            let msg = self
1041                .enc
1042                .unpack(&mut raw)
1043                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1044            self.pending_acks.push(msg.msg_id);
1045            if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
1046                let ack_body = build_msgs_ack_body(&self.pending_acks);
1047                let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1048                let _ =
1049                    Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
1050                self.pending_acks.clear();
1051            }
1052            // Salt updated only on explicit bad_server_salt, not on every message.
1053            if msg.body.len() < 4 {
1054                return Ok(msg.body);
1055            }
1056            let mut need_resend = false;
1057            let mut need_session_reset = false;
1058            let mut bad_msg_code: Option<u32> = None;
1059            let mut bad_msg_server_id: Option<i64> = None;
1060            // Save result before handling flags; apply all before returning.
1061            let scan_result = Self::scan_body(
1062                &msg.body,
1063                &mut self.enc.salt,
1064                &mut need_resend,
1065                &mut need_session_reset,
1066                &mut bad_msg_code,
1067                &mut bad_msg_server_id,
1068                Some(sent_msg_id),
1069                msg.msg_id,
1070            )?;
1071            if need_session_reset {
1072                session_resets += 1;
1073                if session_resets > 2 {
1074                    return Err(InvocationError::Deserialize(
1075                        "new_session_created (serializable): exceeded 2 resets".into(),
1076                    ));
1077                }
1078                if !self.pending_acks.is_empty() {
1079                    let ack_body = build_msgs_ack_body(&self.pending_acks);
1080                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1081                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
1082                        .await;
1083                    self.pending_acks.clear();
1084                }
1085                if scan_result.is_none() {
1086                    let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
1087                    sent_msg_id = new_id;
1088                    Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
1089                }
1090            } else if need_resend {
1091                match bad_msg_code {
1092                    Some(16) | Some(17) => {
1093                        if let Some(srv_id) = bad_msg_server_id {
1094                            self.enc.correct_time_offset(srv_id);
1095                        }
1096                        // Do not call undo_seq_no (see rpc_call for explanation).
1097                    }
1098                    Some(32) | Some(33) => {
1099                        self.enc
1100                            .correct_seq_no(bad_msg_code.expect("matched Some arm"));
1101                    }
1102                    _ => {
1103                        self.enc.undo_seq_no();
1104                    }
1105                }
1106                salt_retries += 1;
1107                if salt_retries >= 5 {
1108                    return Err(InvocationError::Deserialize(
1109                        "bad_server_salt (serializable): exceeded 5 retries".into(),
1110                    ));
1111                }
1112                tracing::debug!(
1113                    "[dc_pool] resend serializable (code={bad_msg_code:?}) [{salt_retries}/5]"
1114                );
1115                if !self.pending_acks.is_empty() {
1116                    let ack_body = build_msgs_ack_body(&self.pending_acks);
1117                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1118                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
1119                        .await;
1120                    self.pending_acks.clear();
1121                }
1122                let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
1123                sent_msg_id = new_id;
1124                Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
1125            }
1126            if let Some(result) = scan_result {
1127                return Ok(result);
1128            }
1129        }
1130    }
1131
1132    /// Send pre-serialized raw bytes and receive the raw response.
1133    /// Used by CDN download connections (no MTProto encryption layer).
1134    pub async fn rpc_call_raw(&mut self, body: &[u8]) -> Result<Vec<u8>, InvocationError> {
1135        Self::send_abridged(&mut self.stream, body, self.cipher.as_mut()).await?;
1136        Self::recv_abridged(&mut self.stream, self.cipher.as_mut()).await
1137    }
1138
1139    async fn send_abridged(
1140        stream: &mut TcpStream,
1141        data: &[u8],
1142        cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1143    ) -> Result<(), InvocationError> {
1144        // Single write_all: avoids Nagle stalls and partial-write corruption.
1145        let words = data.len() / 4;
1146        let mut frame = if words < 0x7f {
1147            let mut v = Vec::with_capacity(1 + data.len());
1148            v.push(words as u8);
1149            v
1150        } else {
1151            let mut v = Vec::with_capacity(4 + data.len());
1152            v.extend_from_slice(&[
1153                0x7f,
1154                (words & 0xff) as u8,
1155                ((words >> 8) & 0xff) as u8,
1156                ((words >> 16) & 0xff) as u8,
1157            ]);
1158            v
1159        };
1160        frame.extend_from_slice(data);
1161        if let Some(c) = cipher {
1162            c.encrypt(&mut frame);
1163        }
1164        stream.write_all(&frame).await?;
1165        Ok(())
1166    }
1167
1168    async fn recv_abridged(
1169        stream: &mut TcpStream,
1170        mut cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1171    ) -> Result<Vec<u8>, InvocationError> {
1172        // 60-second recv timeout: prevents hung reads on silently closed connections.
1173        use tokio::time::{Duration, timeout};
1174        const RECV_TIMEOUT: Duration = Duration::from_secs(60);
1175
1176        let mut h = [0u8; 1];
1177        timeout(RECV_TIMEOUT, stream.read_exact(&mut h))
1178            .await
1179            .map_err(|_| {
1180                InvocationError::Io(std::io::Error::new(
1181                    std::io::ErrorKind::TimedOut,
1182                    "transfer recv: header timeout (60 s)",
1183                ))
1184            })??;
1185        if let Some(ref mut c) = cipher.as_mut() {
1186            c.decrypt(&mut h);
1187        }
1188
1189        // 0x7f = extended length; next 3 bytes are the LE word count.
1190        let words = if h[0] == 0x7f {
1191            let mut b = [0u8; 3];
1192            timeout(RECV_TIMEOUT, stream.read_exact(&mut b))
1193                .await
1194                .map_err(|_| {
1195                    InvocationError::Io(std::io::Error::new(
1196                        std::io::ErrorKind::TimedOut,
1197                        "transfer recv: length timeout (60 s)",
1198                    ))
1199                })??;
1200            if let Some(ref mut c) = cipher.as_mut() {
1201                c.decrypt(&mut b);
1202            }
1203            b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
1204        } else {
1205            h[0] as usize
1206        };
1207
1208        let mut buf = vec![0u8; words * 4];
1209        timeout(RECV_TIMEOUT, stream.read_exact(&mut buf))
1210            .await
1211            .map_err(|_| {
1212                InvocationError::Io(std::io::Error::new(
1213                    std::io::ErrorKind::TimedOut,
1214                    "transfer recv: body timeout (60 s)",
1215                ))
1216            })??;
1217        if let Some(c) = cipher {
1218            c.decrypt(&mut buf);
1219        }
1220
1221        // Transport errors are exactly 4 bytes (negative LE i32).
1222        // A valid encrypted MTProto frame is always ≥ 68 bytes:
1223        //   auth_key_id(8) + msg_key(16) + encrypted[salt(8)+session_id(8)+
1224        //   msg_id(8)+seq_no(4)+data_len(4)+body(≥4)+padding(≥12)] ≥ 68 bytes.
1225        // Checking buf.len() == 4 is therefore both necessary and sufficient to
1226        // distinguish a transport error from a valid encrypted frame; this is
1227        // correct by protocol structure, not merely empirically safe.
1228        if buf.len() == 4 {
1229            let code =
1230                i32::from_le_bytes(buf[..4].try_into().expect("buf.len() == 4 checked above"));
1231            if code < 0 {
1232                return Err(InvocationError::Io(std::io::Error::new(
1233                    std::io::ErrorKind::ConnectionRefused,
1234                    format!("transport error from server: {code}"),
1235                )));
1236            }
1237        }
1238
1239        Ok(buf)
1240    }
1241
1242    async fn send_plain_frame(
1243        stream: &mut TcpStream,
1244        data: &[u8],
1245        cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1246    ) -> Result<(), InvocationError> {
1247        // Abridged framing uses word-count (len/4): pad to 4-byte boundary.
1248        // TL parsers ignore trailing zero bytes.
1249        if !data.len().is_multiple_of(4) {
1250            let mut padded = data.to_vec();
1251            let pad = 4 - (data.len() % 4);
1252            padded.resize(data.len() + pad, 0);
1253            Self::send_abridged(stream, &padded, cipher).await
1254        } else {
1255            Self::send_abridged(stream, data, cipher).await
1256        }
1257    }
1258
1259    async fn recv_plain_frame<T: Deserializable>(
1260        stream: &mut TcpStream,
1261        cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1262    ) -> Result<T, InvocationError> {
1263        let raw = Self::recv_abridged(stream, cipher).await?;
1264        // A 4-byte negative payload is a transport error code from the server.
1265        // Surface it directly rather than masking it with "plain frame too short".
1266        if raw.len() == 4 {
1267            let code =
1268                i32::from_le_bytes(raw[..4].try_into().expect("raw.len() == 4 checked above"));
1269            if code < 0 {
1270                return Err(InvocationError::Deserialize(format!(
1271                    "server transport error during DH: code {code}"
1272                )));
1273            }
1274        }
1275        if raw.len() < 20 {
1276            return Err(InvocationError::Deserialize("plain frame too short".into()));
1277        }
1278        // auth_key_id must be 0 in plaintext frames; checked after length guard above.
1279        if u64::from_le_bytes(
1280            raw[..8]
1281                .try_into()
1282                .expect("raw.len() >= 20 checked above, >= 8 implied"),
1283        ) != 0
1284        {
1285            return Err(InvocationError::Deserialize(
1286                "expected auth_key_id=0 in plaintext".into(),
1287            ));
1288        }
1289        let body_len = u32::from_le_bytes(
1290            raw[16..20]
1291                .try_into()
1292                .expect("raw.len() >= 20 checked above"),
1293        ) as usize;
1294        if raw.len() < 20 + body_len {
1295            return Err(InvocationError::Deserialize(format!(
1296                "plain frame truncated: have {} bytes, need {}",
1297                raw.len(),
1298                20 + body_len
1299            )));
1300        }
1301        let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
1302        T::deserialize(&mut cur).map_err(Into::into)
1303    }
1304}
1305
1306/// Check a decrypted PFS bind response body for boolTrue.
1307/// Decode one bare MTProto message body for the auth.bindTempAuthKey response (pool path).
1308fn pfs_pool_decode_bind_single(body: &[u8]) -> Result<(), String> {
1309    const RPC_RESULT: u32 = 0xf35c6d01;
1310    const BOOL_TRUE: u32 = 0x9972_75b5;
1311    const BOOL_FALSE: u32 = 0xbc79_9737;
1312    const RPC_ERROR: u32 = 0x2144_ca19;
1313    const BAD_MSG: u32 = 0xa7ef_f811;
1314    const BAD_SALT: u32 = 0xedab_447b;
1315    const NEW_SESSION: u32 = 0x9ec2_0908;
1316    const FUTURE_SALTS: u32 = 0xae50_0895;
1317    const MSGS_ACK: u32 = 0x62d6_b459; // msgs_ack#62d6b459
1318    const PONG: u32 = 0x0347_73c5;
1319
1320    if body.len() < 4 {
1321        return Err("skip".into());
1322    }
1323    let ctor = u32::from_le_bytes(body[..4].try_into().expect("body.len() >= 4 checked above"));
1324
1325    match ctor {
1326        BOOL_TRUE => Ok(()),
1327        BOOL_FALSE => Err("server returned boolFalse (binding rejected)".into()),
1328        NEW_SESSION | FUTURE_SALTS | MSGS_ACK | PONG => Err("skip".into()),
1329
1330        RPC_RESULT if body.len() >= 16 => {
1331            let inner = u32::from_le_bytes(
1332                body[12..16]
1333                    .try_into()
1334                    .expect("body.len() >= 16 from match arm guard"),
1335            );
1336            match inner {
1337                BOOL_TRUE => Ok(()),
1338                BOOL_FALSE => Err("rpc_result{boolFalse} (server rejected binding)".into()),
1339                RPC_ERROR if body.len() >= 20 => {
1340                    let code = i32::from_le_bytes(
1341                        body[16..20]
1342                            .try_into()
1343                            .expect("body.len() >= 20 from match arm guard"),
1344                    );
1345                    let msg = tl_read_string(body.get(20..).unwrap_or(&[])).unwrap_or_default();
1346                    Err(format!("rpc_error code={code} message={msg:?}"))
1347                }
1348                _ => Err(format!("rpc_result inner ctor={inner:#010x}")),
1349            }
1350        }
1351
1352        BAD_MSG if body.len() >= 16 => {
1353            let code = u32::from_le_bytes(
1354                body[12..16]
1355                    .try_into()
1356                    .expect("body.len() >= 16 from match arm guard"),
1357            );
1358            let desc = match code {
1359                16 => "msg_id too low (clock skew)",
1360                17 => "msg_id too high (clock skew)",
1361                18 => "incorrect lower 2 bits of msg_id",
1362                19 => "duplicate msg_id",
1363                20 => "message too old (>300s)",
1364                32 => "msg_seqno too low",
1365                33 => "msg_seqno too high",
1366                48 => "incorrect server salt",
1367                _ => "unknown code",
1368            };
1369            Err(format!("bad_msg_notification code={code} ({desc})"))
1370        }
1371
1372        BAD_SALT if body.len() >= 24 => {
1373            let new_salt = i64::from_le_bytes(
1374                body[16..24]
1375                    .try_into()
1376                    .expect("body.len() >= 24 from match arm guard"),
1377            );
1378            Err(format!(
1379                "bad_server_salt, server wants salt={new_salt:#018x}"
1380            ))
1381        }
1382
1383        _ => Err(format!("unknown ctor={ctor:#010x}")),
1384    }
1385}
1386
1387/// Decode the server response to auth.bindTempAuthKey (pool path).
1388///
1389/// Handles bare messages AND msg_container (the server frequently bundles
1390/// new_session_created + rpc_result together in a container).
1391fn pfs_pool_decode_bind_response(body: &[u8]) -> Result<(), String> {
1392    const MSG_CONTAINER: u32 = 0x73f1f8dc;
1393
1394    if body.len() < 4 {
1395        return Err(format!("response body too short ({} bytes)", body.len()));
1396    }
1397    let ctor = u32::from_le_bytes(body[..4].try_into().expect("body.len() >= 4 checked above"));
1398
1399    if ctor != MSG_CONTAINER {
1400        return pfs_pool_decode_bind_single(body).map_err(|e| {
1401            if e == "skip" {
1402                "__need_more__".into()
1403            } else {
1404                e
1405            }
1406        });
1407    }
1408
1409    if body.len() < 8 {
1410        return Err("msg_container too short to read count".into());
1411    }
1412    let count = u32::from_le_bytes(
1413        body[4..8]
1414            .try_into()
1415            .expect("body.len() >= 8 checked above"),
1416    ) as usize;
1417    let mut pos = 8usize;
1418    let mut last_real_err: Option<String> = None;
1419
1420    for i in 0..count {
1421        if pos + 16 > body.len() {
1422            return Err(format!(
1423                "msg_container truncated at message {i}/{count} (pos={pos} body_len={})",
1424                body.len()
1425            ));
1426        }
1427        let msg_bytes = u32::from_le_bytes(
1428            body[pos + 12..pos + 16]
1429                .try_into()
1430                .expect("pos+16 <= body.len() checked above"),
1431        ) as usize;
1432        pos += 16;
1433
1434        if pos + msg_bytes > body.len() {
1435            return Err(format!(
1436                "msg_container message {i} body overflows (need {msg_bytes}, have {})",
1437                body.len() - pos
1438            ));
1439        }
1440        let msg_body = &body[pos..pos + msg_bytes];
1441        pos += msg_bytes;
1442
1443        match pfs_pool_decode_bind_single(msg_body) {
1444            Ok(()) => return Ok(()),
1445            Err(e) if e == "skip" => continue,
1446            Err(e) => {
1447                last_real_err = Some(e);
1448            }
1449        }
1450    }
1451
1452    Err(last_real_err.unwrap_or_else(|| "__need_more__".into()))
1453}
1454
1455fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
1456    if data.is_empty() {
1457        return Some(vec![]);
1458    }
1459    let (len, start) = if data[0] < 254 {
1460        (data[0] as usize, 1)
1461    } else if data.len() >= 4 {
1462        (
1463            data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16,
1464            4,
1465        )
1466    } else {
1467        return None;
1468    };
1469    if data.len() < start + len {
1470        return None;
1471    }
1472    Some(data[start..start + len].to_vec())
1473}
1474
1475fn tl_read_string(data: &[u8]) -> Option<String> {
1476    tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
1477}