Skip to main content

ferogram_mtsender/
sender.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2//
3// ferogram: async Telegram MTProto client in Rust
4// https://github.com/ankit-chaubey/ferogram
5//
6// Licensed under either the MIT License or the Apache License 2.0.
7// See the LICENSE-MIT or LICENSE-APACHE file in this repository:
8// https://github.com/ankit-chaubey/ferogram
9//
10// Feel free to use, modify, and share this code.
11// Please keep this notice when redistributing.
12
13use ferogram_connect::FrameKind;
14use ferogram_mtproto::{
15    EncryptedSession, SeenMsgIds, Session, authentication as auth, new_seen_msg_ids, step2_temp,
16};
17use ferogram_tl_types as tl;
18use ferogram_tl_types::{Cursor, Deserializable, RemoteCall};
19use tokio::io::AsyncReadExt;
20use tokio::net::TcpStream;
21
22use crate::errors::InvocationError;
23use crate::pool::{build_msgs_ack_body, build_msgs_ack_ping_body};
24use ferogram_connect::TransportKind;
25// metrics and tracing
26#[allow(unused_imports)]
27use metrics::{counter, histogram};
28
29/// A single encrypted connection to one Telegram DC.
30/// Un-acked server msg_ids to accumulate before eagerly flushing a `msgs_ack` frame.
31const PENDING_ACKS_THRESHOLD: usize = 10;
32
33/// `PingDelayDisconnect` interval for worker connections (in GetFile chunks).
34/// Keeps the socket alive within Telegram's 75-second idle-disconnect window.
35const PING_EVERY_N_CHUNKS: u32 = 5;
36
37pub struct DcConnection {
38    stream: TcpStream,
39    enc: EncryptedSession,
40    pending_acks: Vec<i64>,
41    call_count: u32,
42    /// Active framing kind for this connection.
43    frame_kind: FrameKind,
44    /// Persistent dedup ring that outlives individual EncryptedSessions.
45    #[allow(dead_code)]
46    seen_msg_ids: SeenMsgIds,
47}
48
49impl DcConnection {
50    /// Race Obfuscated / Abridged / Http transports and return the first to succeed.
51    #[tracing::instrument(skip(socks5), fields(addr = %addr, dc_id = dc_id))]
52    pub async fn connect_fastest(
53        addr: &str,
54        socks5: Option<&ferogram_connect::Socks5Config>,
55        dc_id: i16,
56    ) -> Result<(Self, &'static str), InvocationError> {
57        use tokio::task::JoinSet;
58        let addr = addr.to_owned();
59        let socks5 = socks5.cloned();
60        tracing::debug!(
61            "[ferogram::sender] probing {addr} with Full, Obfuscated, and Abridged transports in parallel"
62        );
63        let mut set: JoinSet<Result<(DcConnection, &'static str), InvocationError>> =
64            JoinSet::new();
65
66        {
67            let a = addr.clone();
68            let s = socks5.clone();
69            set.spawn(async move {
70                Ok((
71                    DcConnection::connect_raw(&a, s.as_ref(), &TransportKind::Full, dc_id).await?,
72                    "Full",
73                ))
74            });
75        }
76        {
77            let a = addr.clone();
78            let s = socks5.clone();
79            set.spawn(async move {
80                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
81                Ok((
82                    DcConnection::connect_raw(
83                        &a,
84                        s.as_ref(),
85                        &TransportKind::Obfuscated { secret: None },
86                        dc_id,
87                    )
88                    .await?,
89                    "Obfuscated",
90                ))
91            });
92        }
93        {
94            let a = addr.clone();
95            let s = socks5.clone();
96            set.spawn(async move {
97                tokio::time::sleep(std::time::Duration::from_millis(400)).await;
98                Ok((
99                    DcConnection::connect_raw(&a, s.as_ref(), &TransportKind::Abridged, dc_id)
100                        .await?,
101                    "Abridged",
102                ))
103            });
104        }
105        {
106            let a = addr.clone();
107            set.spawn(async move {
108                tokio::time::sleep(std::time::Duration::from_millis(800)).await;
109                Ok((
110                    DcConnection::connect_raw(&a, None, &TransportKind::Http, dc_id).await?,
111                    "Http",
112                ))
113            });
114        }
115
116        let mut last_err = InvocationError::Deserialize("connect_fastest: no candidates".into());
117        while let Some(outcome) = set.join_next().await {
118            match outcome {
119                Ok(Ok((conn, label))) => {
120                    set.abort_all();
121                    return Ok((conn, label));
122                }
123                Ok(Err(e)) => {
124                    last_err = e;
125                }
126                Err(e) if e.is_cancelled() => {}
127                Err(_) => {}
128            }
129        }
130        Err(last_err)
131    }
132
133    /// Connect and perform full DH handshake.
134    #[tracing::instrument(skip(socks5, transport), fields(addr = %addr, dc_id = dc_id))]
135    pub async fn connect_raw(
136        addr: &str,
137        socks5: Option<&ferogram_connect::Socks5Config>,
138        transport: &TransportKind,
139        dc_id: i16,
140    ) -> Result<Self, InvocationError> {
141        tracing::debug!("[ferogram::sender] connecting to {addr} with known auth key");
142        let (stream, frame_kind, enc) =
143            ferogram_connect::connect_to_dc(addr, dc_id, transport, socks5, None).await?;
144
145        tracing::debug!("[ferogram::sender] DH complete, auth key established for {addr}");
146        let seen = new_seen_msg_ids();
147        Ok(Self {
148            stream,
149            frame_kind,
150            enc: EncryptedSession::with_seen(
151                enc.auth_key_bytes(),
152                enc.salt,
153                enc.time_offset,
154                seen.clone(),
155            ),
156            pending_acks: Vec::new(),
157            call_count: 0,
158            seen_msg_ids: seen,
159        })
160    }
161
162    /// Connect with an already-known auth key (no DH needed).
163    /// If `pfs` is true, performs a temp-key DH bind before any RPCs.
164    #[allow(clippy::too_many_arguments)]
165    pub async fn connect_with_key(
166        addr: &str,
167        auth_key: [u8; 256],
168        first_salt: i64,
169        time_offset: i32,
170        socks5: Option<&ferogram_connect::Socks5Config>,
171        mtproxy: Option<&ferogram_connect::MtProxyConfig>,
172        transport: &TransportKind,
173        dc_id: i16,
174        pfs: bool,
175    ) -> Result<Self, InvocationError> {
176        // ferogram-connect owns TCP open + keepalive + transport init.
177        let (mut stream, mut frame_kind) =
178            ferogram_connect::Connection::open_stream_pub(addr, dc_id, transport, socks5, mtproxy)
179                .await?;
180
181        if pfs {
182            tracing::debug!("[ferogram::sender] PFS: binding temporary key for DC{dc_id}");
183            match Self::do_pool_pfs_bind(&mut stream, &mut frame_kind, &auth_key, dc_id).await {
184                Ok(temp_enc) => {
185                    tracing::debug!("[ferogram::sender] PFS: temporary key bound for DC{dc_id}");
186                    return Ok(Self {
187                        stream,
188                        frame_kind,
189                        enc: temp_enc,
190                        pending_acks: Vec::new(),
191                        call_count: 0,
192                        seen_msg_ids: new_seen_msg_ids(),
193                    });
194                }
195                Err(e) => {
196                    tracing::warn!(
197                        "[ferogram::sender] PFS bind failed for DC{dc_id} ({e}); using permanent key"
198                    );
199                    return Err(e);
200                }
201            }
202        }
203
204        let seen = new_seen_msg_ids();
205        Ok(Self {
206            stream,
207            frame_kind,
208            enc: EncryptedSession::with_seen(auth_key, first_salt, time_offset, seen.clone()),
209            pending_acks: Vec::new(),
210            call_count: 0,
211            seen_msg_ids: seen,
212        })
213    }
214
215    /// Temp-key DH handshake + auth.bindTempAuthKey on an existing stream.
216    async fn do_pool_pfs_bind(
217        stream: &mut tokio::net::TcpStream,
218        kind: &mut FrameKind,
219        perm_auth_key: &[u8; 256],
220        dc_id: i16,
221    ) -> Result<EncryptedSession, InvocationError> {
222        use ferogram_mtproto::{
223            auth_key_id_from_key, encrypt_bind_inner, gen_msg_id, new_seen_msg_ids,
224            serialize_bind_temp_auth_key,
225        };
226        const TEMP_EXPIRES: i32 = 86_400; // 24 h
227
228        // temp-key DH
229        let mut plain = Session::new();
230
231        let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
232        Self::send_plain_frame(stream, &plain.pack(&req1).to_plaintext_bytes(), kind).await?;
233        let res_pq: tl::enums::ResPq = Self::recv_plain_frame(stream, kind).await?;
234
235        let (req2, s2) = step2_temp(s1, res_pq, dc_id as i32, TEMP_EXPIRES)
236            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
237        Self::send_plain_frame(stream, &plain.pack(&req2).to_plaintext_bytes(), kind).await?;
238        let dh: tl::enums::ServerDhParams = Self::recv_plain_frame(stream, kind).await?;
239
240        let (req3, s3) =
241            auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
242        Self::send_plain_frame(stream, &plain.pack(&req3).to_plaintext_bytes(), kind).await?;
243        let ans: tl::enums::SetClientDhParamsAnswer = Self::recv_plain_frame(stream, kind).await?;
244
245        let done = {
246            let mut result =
247                auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
248            let mut attempts = 0u8;
249            loop {
250                match result {
251                    ferogram_mtproto::FinishResult::Done(d) => break d,
252                    ferogram_mtproto::FinishResult::Retry {
253                        retry_id,
254                        dh_params,
255                        nonce,
256                        server_nonce,
257                        new_nonce,
258                    } => {
259                        attempts += 1;
260                        if attempts >= 5 {
261                            return Err(InvocationError::Deserialize(
262                                "PFS pool temp DH retry exceeded 5".into(),
263                            ));
264                        }
265                        let (rr, s3r) = ferogram_mtproto::retry_step3(
266                            &dh_params,
267                            nonce,
268                            server_nonce,
269                            new_nonce,
270                            retry_id,
271                        )
272                        .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
273                        Self::send_plain_frame(stream, &plain.pack(&rr).to_plaintext_bytes(), kind)
274                            .await?;
275                        let ar: tl::enums::SetClientDhParamsAnswer =
276                            Self::recv_plain_frame(stream, kind).await?;
277                        result = auth::finish(s3r, ar)
278                            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
279                    }
280                }
281            }
282        };
283
284        let temp_key = done.auth_key;
285        let temp_salt = done.first_salt;
286        let temp_offset = done.time_offset;
287
288        // build bindTempAuthKey body
289        let temp_key_id = auth_key_id_from_key(&temp_key);
290        let perm_key_id = auth_key_id_from_key(perm_auth_key);
291
292        let mut nonce_buf = [0u8; 8];
293        ferogram_crypto::fill_random(&mut nonce_buf);
294        let nonce = i64::from_le_bytes(nonce_buf);
295
296        let server_now = std::time::SystemTime::now()
297            .duration_since(std::time::UNIX_EPOCH)
298            .expect("system clock is before UNIX epoch")
299            .as_secs() as i32
300            + temp_offset;
301        let expires_at = server_now + TEMP_EXPIRES;
302
303        let seen = new_seen_msg_ids();
304        let mut temp_enc = EncryptedSession::with_seen(temp_key, temp_salt, temp_offset, seen);
305        let temp_session_id = temp_enc.session_id();
306
307        let msg_id = gen_msg_id();
308        let enc_msg = encrypt_bind_inner(
309            perm_auth_key,
310            msg_id,
311            nonce,
312            temp_key_id,
313            perm_key_id,
314            temp_session_id,
315            expires_at,
316        );
317        let bind_body = serialize_bind_temp_auth_key(perm_key_id, nonce, expires_at, &enc_msg);
318
319        // send encrypted bind request
320        let wire = temp_enc.pack_body_at_msg_id(&bind_body, msg_id);
321        Self::send_abridged(stream, &wire, kind).await?;
322
323        // Receive and verify response.
324        // The server may send informational frames first (msgs_ack, new_session_created)
325        // before the actual rpc_result{boolTrue}, so we loop up to 5 frames.
326        for attempt in 0u8..5 {
327            let mut raw = Self::recv_abridged(stream, kind).await?;
328            let decrypted = temp_enc.unpack(&mut raw).map_err(|e| {
329                InvocationError::Deserialize(format!("PFS pool bind decrypt: {e:?}"))
330            })?;
331            match ferogram_connect::decode_bind_response(&decrypted.body) {
332                Ok(()) => {
333                    // bindTempAuthKey succeeds under the temp key; keep the session
334                    // sequence as-is so subsequent RPCs continue from the same MTProto
335                    // message stream.
336                    return Ok(temp_enc);
337                }
338                Err(ref e) if e == "__need_more__" => {
339                    tracing::debug!(
340                        "[ferogram::sender] PFS (DC{dc_id}): got informational frame on attempt {attempt}, reading next"
341                    );
342                    continue;
343                }
344                Err(reason) => {
345                    tracing::error!(
346                        "[ferogram::sender] PFS bind rejected by server for DC{dc_id}: {reason}"
347                    );
348                    return Err(InvocationError::Deserialize(format!(
349                        "auth.bindTempAuthKey (pool): {reason}"
350                    )));
351                }
352            }
353        }
354        Err(InvocationError::Deserialize(
355            "auth.bindTempAuthKey (pool): no boolTrue after 5 frames".into(),
356        ))
357    }
358
359    /// The auth key this connection is currently encrypted with. Unlike
360    /// [`crate::MtpSender::auth_key_bytes`], there's no separate permanent
361    /// key tracked here, so under PFS this is the temporary key, not one
362    /// safe to persist to the session.
363    pub fn auth_key_bytes(&self) -> [u8; 256] {
364        self.enc.auth_key_bytes()
365    }
366    /// The server salt this connection started with.
367    pub fn first_salt(&self) -> i64 {
368        self.enc.salt
369    }
370    /// Clock offset (seconds) between this client and the server.
371    pub fn time_offset(&self) -> i32 {
372        self.enc.time_offset
373    }
374
375    /// Break the connection into its raw parts so the caller can hand them to
376    /// the pipelined [`crate::sender_task::spawn_sender_task`] instead of using
377    /// this struct's blocking [`rpc_call`](Self::rpc_call).
378    ///
379    /// `DcPool` uses this once a connection has finished its setup (DH, PFS
380    /// bind, initConnection) as a plain `DcConnection`: the pool graduates it
381    /// into a background sender task so every later request on the pool's
382    /// `invoke_on_dc` path pipelines instead of blocking the connection for
383    /// each round trip.
384    pub(crate) fn into_parts(self) -> (TcpStream, FrameKind, EncryptedSession) {
385        (self.stream, self.frame_kind, self.enc)
386    }
387
388    /// Send `req` and block until its matching `rpc_result` comes back,
389    /// discarding or handling anything else that arrives in between (server
390    /// pushes, the periodic keepalive ping, salt/session-reset retries).
391    /// One request at a time; `DcPool` graduates connections that need
392    /// pipelined concurrent requests into a background sender task instead
393    /// of using this directly.
394    #[tracing::instrument(skip(self, req), fields(method = std::any::type_name::<R>()))]
395    pub async fn rpc_call<R: RemoteCall>(&mut self, req: &R) -> Result<Vec<u8>, InvocationError> {
396        let _t0 = std::time::Instant::now();
397        // Periodic PingDelayDisconnect: sent before the request to piggyback on
398        // the same TCP write window.  Keeps the socket alive across the download.
399        self.call_count += 1;
400        if self.call_count.is_multiple_of(PING_EVERY_N_CHUNKS) {
401            let ping_id = self.call_count as i64;
402            let ping_body = build_msgs_ack_ping_body(ping_id);
403            // PingDelayDisconnect is content-related (returns Pong): must use odd seq_no.
404            let (ping_wire, _) = self.enc.pack_body_with_msg_id(&ping_body, true);
405            // This ping is fire-and-forget. The Pong response is a content-related
406            // server message and must be acknowledged. If the RPC result arrives before
407            // the Pong, the Pong's msg_id is never added to pending_acks. On idle
408            // connections (no subsequent RPCs) the un-acked Pong will eventually cause
409            // Telegram to close the connection. A dedicated always-running reader task
410            // that drains and acks all server messages would fix this permanently; for
411            // now the next rpc_call iteration receives and acks the Pong via pending_acks.
412            let _ = Self::send_abridged(&mut self.stream, &ping_wire, &mut self.frame_kind).await;
413        }
414
415        // Flush pending acks.
416        if !self.pending_acks.is_empty() {
417            let ack_body = build_msgs_ack_body(&self.pending_acks);
418            let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
419            let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
420            self.pending_acks.clear();
421        }
422
423        // Track sent msg_id to verify rpc_result.req_msg_id and discard stale responses.
424        let (wire, mut sent_msg_id) = self.enc.pack_with_msg_id(req);
425        Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
426        let mut salt_retries = 0u8;
427        let mut session_resets = 0u8;
428        loop {
429            let mut raw = Self::recv_abridged(&mut self.stream, &mut self.frame_kind).await?;
430            let msg = self
431                .enc
432                .unpack(&mut raw)
433                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
434            // Track every received msg_id for acknowledgement.
435            self.pending_acks.push(msg.msg_id);
436            if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
437                // Eager flush: too many un-acked messages  - Telegram will close the
438                // connection if we don't ack within its window.
439                let ack_body = build_msgs_ack_body(&self.pending_acks);
440                let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
441                let _ =
442                    Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
443                self.pending_acks.clear();
444            }
445            // Salt is updated only on explicit bad_server_salt, not on every message.
446            if msg.body.len() < 4 {
447                return Ok(msg.body);
448            }
449            let mut need_resend = false;
450            let mut need_session_reset = false;
451            let mut bad_msg_code: Option<u32> = None;
452            let mut bad_msg_server_id: Option<i64> = None;
453            // Process all flags before returning: containers may carry
454            // new_session_created + rpc_result together.
455            let scan_result = Self::scan_body(
456                &msg.body,
457                &mut self.enc.salt,
458                &mut need_resend,
459                &mut need_session_reset,
460                &mut bad_msg_code,
461                &mut bad_msg_server_id,
462                Some(sent_msg_id),
463                msg.msg_id,
464            )?;
465            // new_session_created requires seq_no reset to 0.
466            if need_session_reset {
467                session_resets += 1;
468                if session_resets > 2 {
469                    return Err(InvocationError::Deserialize(
470                        "new_session_created: exceeded 2 resets".into(),
471                    ));
472                }
473                if !self.pending_acks.is_empty() {
474                    let ack_body = build_msgs_ack_body(&self.pending_acks);
475                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
476                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
477                        .await;
478                    self.pending_acks.clear();
479                }
480                // Keep the current session sequence. new_session_created updates the
481                // server salt and may require resending stale requests, but it does
482                // not require zeroing the local MTProto seq counter.
483                if scan_result.is_none() {
484                    // No result yet; resend using the current MTProto sequence.
485                    tracing::debug!(
486                        "[ferogram::sender] new_session_created: resending request (attempt {session_resets}/2)"
487                    );
488                    let (wire, new_id) = self.enc.pack_with_msg_id(req);
489                    sent_msg_id = new_id;
490                    Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
491                }
492                // If scan_result.is_some(), the result arrived in the same container
493                // as new_session_created; session has been reset for future calls,
494                // fall through to return the result.
495            } else if need_resend {
496                // Apply seq_no / time corrections from bad_msg_notification.
497                match bad_msg_code {
498                    Some(16) | Some(17) => {
499                        if let Some(srv_id) = bad_msg_server_id {
500                            self.enc.correct_time_offset(srv_id);
501                        }
502                        // Do not call undo_seq_no here. Reusing the same seq_no on a
503                        // retry violates MTProto monotonicity; the server may reject
504                        // with code 32. Let the next pack_with_msg_id assign the next
505                        // available odd seq_no for the resent message.
506                    }
507                    Some(32) | Some(33) => {
508                        // correct_seq_no does a full session reset (new session_id,
509                        // seq_no=0) instead of magic +/- offsets.
510                        self.enc
511                            .correct_seq_no(bad_msg_code.expect("matched Some arm"));
512                    }
513                    _ => {
514                        // bad_server_salt or bad_msg code 48
515                        self.enc.undo_seq_no();
516                    }
517                }
518                salt_retries += 1;
519                if salt_retries >= 5 {
520                    return Err(InvocationError::Deserialize(
521                        "bad_server_salt/bad_msg: exceeded 5 retries".into(),
522                    ));
523                }
524                tracing::debug!(
525                    "[ferogram::sender] resending transfer request after bad_msg correction (code={bad_msg_code:?}, attempt {salt_retries}/5)"
526                );
527                if !self.pending_acks.is_empty() {
528                    let ack_body = build_msgs_ack_body(&self.pending_acks);
529                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
530                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
531                        .await;
532                    self.pending_acks.clear();
533                }
534                let (wire, new_id) = self.enc.pack_with_msg_id(req);
535                sent_msg_id = new_id;
536                Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
537            }
538            if let Some(result) = scan_result {
539                metrics::counter!("ferogram.rpc_calls_total", "result" => "ok").increment(1);
540                metrics::histogram!("ferogram.rpc_latency_ms")
541                    .record(_t0.elapsed().as_millis() as f64);
542                return Ok(result);
543            }
544        }
545    }
546    ///
547    /// Returns `Ok(Some(bytes))` when rpc_result is found.
548    /// Returns `Ok(None)` for informational messages (continue reading).
549    /// Returns `Err` for rpc_error or parse failures.
550    ///
551    /// Output flags:
552    /// - `need_resend`: set for bad_server_salt / bad_msg_notification (codes 16/17/32/33/48)
553    /// - `need_session_reset`: set for new_session_created (seq_no must reset to 0)
554    /// - `bad_msg_code`: error_code from bad_msg_notification for caller to apply correction
555    /// - `bad_msg_server_id`: server msg_id for time-offset correction (codes 16/17)
556    /// - `server_msg_id`: outer frame msg_id for time-offset correction (codes 16/17).
557    ///   Must be msg.msg_id from the caller, not bad_msg_id (client clock, not server's).
558    #[allow(clippy::too_many_arguments)]
559    fn scan_body(
560        body: &[u8],
561        salt: &mut i64,
562        need_resend: &mut bool,
563        need_session_reset: &mut bool,
564        bad_msg_code: &mut Option<u32>,
565        bad_msg_server_id: &mut Option<i64>,
566        sent_msg_id: Option<i64>,
567        server_msg_id: i64,
568    ) -> Result<Option<Vec<u8>>, InvocationError> {
569        if body.len() < 4 {
570            return Ok(None);
571        }
572        let cid = u32::from_le_bytes(body[..4].try_into().expect("body.len() >= 4 checked above"));
573        match cid {
574            0xf35c6d01 /* rpc_result: CID(4) + req_msg_id(8) + result */ => {
575                if body.len() >= 12
576                    && let Some(expected) = sent_msg_id {
577                        let resp_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 12 checked above"));
578                        if resp_id != expected {
579                            tracing::debug!(
580                                "[ferogram::sender] rpc_result msg_id mismatch (got {resp_id:#018x}, want {expected:#018x}); skipping this frame"
581                            );
582                            return Ok(None);
583                        }
584                    }
585                let inner = if body.len() >= 12 { &body[12..] } else { body };
586                // Inner body may itself be gzip_packed (e.g. help.Config inside rpc_result).
587                if inner.len() >= 4
588                    && u32::from_le_bytes(inner[..4].try_into().expect("inner.len() >= 4 checked above")) == 0x3072cfa1
589                {
590                    let mut dummy_salt = *salt;
591                    let mut nr = false; let mut nsr = false;
592                    let mut bc = None; let mut bsi = None;
593                    if let Some(r) = Self::scan_body(inner, &mut dummy_salt, &mut nr, &mut nsr, &mut bc, &mut bsi, None, server_msg_id)? {
594                        return Ok(Some(r));
595                    }
596                    // Unwrap the gzip directly and return the decompressed bytes.
597                    if let Some(compressed) = ferogram_connect::tl_read_bytes(&inner[4..])
598                        && let Ok(out) = ferogram_connect::gz_inflate(&compressed)
599                    {
600                        return Ok(Some(out));
601                    }
602                    return Ok(None);
603                }
604                if inner.len() >= 8
605                    && u32::from_le_bytes(inner[..4].try_into().expect("inner.len() >= 8 checked above")) == 0x2144ca19
606                {
607                    let code = i32::from_le_bytes(inner[4..8].try_into().expect("inner.len() >= 8 checked above"));
608                    let message = ferogram_connect::tl_read_string(&inner[8..]).unwrap_or_default();
609                    return Err(InvocationError::Rpc(
610                        crate::errors::RpcError::from_telegram(code, &message),
611                    ));
612                }
613                Ok(Some(inner.to_vec()))
614            }
615            0x2144ca19 /* rpc_error */ => {
616                if body.len() < 8 {
617                    return Err(InvocationError::Deserialize("rpc_error short".into()));
618                }
619                let code = i32::from_le_bytes(body[4..8].try_into().expect("body.len() >= 8 checked above"));
620                let message = ferogram_connect::tl_read_string(&body[8..]).unwrap_or_default();
621                Err(InvocationError::Rpc(crate::errors::RpcError::from_telegram(code, &message)))
622            }
623            0xedab447b /* bad_server_salt */ => {
624                // bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int error_code:int new_server_salt:long
625                if body.len() >= 28 {
626                    let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 28 checked above"));
627                    let new_salt   = i64::from_le_bytes(body[20..28].try_into().expect("body.len() >= 28 checked above"));
628                    // Only apply new salt when bad_msg_id matches our sent request;
629                    // stale frames from prior requests must not corrupt the current salt.
630                    if sent_msg_id.is_none_or(|id| id == bad_msg_id) {
631                        *salt = new_salt;
632                        *need_resend = true;
633                    }
634                }
635                Ok(None)
636            }
637            0x9ec20908 /* new_session_created */ => {
638                // new_session_created#9ec20908 first_msg_id:long unique_id:long server_salt:long
639                // Signal need_session_reset so the caller resets seq_no before resending.
640                if body.len() >= 28 {
641                    let first_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 28 checked above"));
642                    let unique_id    = i64::from_le_bytes(body[12..20].try_into().expect("body.len() >= 28 checked above"));
643                    let server_salt  = i64::from_le_bytes(body[20..28].try_into().expect("body.len() >= 28 checked above"));
644                    tracing::debug!(
645                        unique_id = format_args!("{unique_id:#018x}"),
646                        first_msg_id,
647                        salt = server_salt,
648                        "[ferogram::sender] new_session_created: server opened fresh session"
649                    );
650                    *salt = server_salt;
651                    // Only reset if the pending request predates the server's new session.
652                    // If sent_msg_id == first_msg_id (fresh worker conn on first send),
653                    // the server will reply with our current session_id. Unconditionally
654                    // calling reset_session() here changes the id, causing the response
655                    // decrypt to fail with session_id mismatch.
656                    if sent_msg_id.is_some_and(|id| id < first_msg_id) {
657                        *need_session_reset = true;
658                    }
659                }
660                Ok(None)
661            }
662            0xa7eff811 /* bad_msg_notification */ => {
663                // bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int error_code:int
664                //
665                // TL layout: body[4..12]=bad_msg_id, body[12..16]=bad_msg_seqno,
666                // body[16..20]=error_code. Previous code read [12..16] as error_code
667                // (bad_msg_seqno), so error matching always compared the wrong field.
668                if body.len() >= 20 {
669                    let bad_msg_id  = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 20 checked above"));
670                    // body[12..16] = bad_msg_seqno, not used for recovery.
671                    let error_code  = u32::from_le_bytes(body[16..20].try_into().expect("body.len() >= 20 checked above"));
672                    tracing::debug!(
673                        bad_msg_id = format_args!("{bad_msg_id:#018x}"),
674                        error_code,
675                        "[ferogram::sender] bad_msg_notification received"
676                    );
677                    match error_code {
678                        16 | 17 => {
679                            // msg_id too low/high: time-offset correction needed.
680                            // server_msg_id upper 32 bits = server Unix timestamp.
681                            // bad_msg_id carries the client's clock, not the server's.
682                            *bad_msg_code = Some(error_code);
683                            *bad_msg_server_id = Some(server_msg_id);
684                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
685                        }
686                        32 | 33 => {
687                            // seq_no wrong.
688                            *bad_msg_code = Some(error_code);
689                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
690                        }
691                        48 => {
692                            // bad_msg code 48 = incorrect server salt. Per spec, this
693                            // arrives together with a bad_server_salt frame in the same
694                            // container that carries the new salt. If bad_server_salt was
695                            // already processed, *salt is updated and the resend uses the
696                            // correct value. If not (partial container), resend once
697                            // conservatively; the retry loop's 5-attempt cap prevents a loop.
698                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
699                            tracing::debug!(
700                                "[ferogram::sender] bad_msg code 48 (wrong server salt): will resend with updated salt"
701                            );
702                        }
703                        _ => {
704                            // Unknown code; resend to avoid the loop stalling.
705                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
706                        }
707                    }
708                }
709                Ok(None)
710            }
711            0x347773c5 /* pong */ => {
712                // Pong is returned for both internal PingDelayDisconnect (fire-and-forget)
713                // and user-invoked Ping (which has a pending invoke future waiting).
714                // pong layout: CID(4) + msg_id(8) + ping_id(8)
715                // pong.msg_id is the msg_id of the original ping request.
716                // Route back to the caller when it matches the pending sent_msg_id.
717                if body.len() >= 12
718                    && let Some(expected) = sent_msg_id
719                {
720                    let pong_req_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 12 for pong"));
721                    if pong_req_id == expected {
722                        return Ok(Some(body.to_vec()));
723                    }
724                }
725                // Internal keepalive pong - discard.
726                Ok(None)
727            }
728            0x73f1f8dc /* msg_container */ => {
729                if body.len() < 8 {
730                    return Ok(None);
731                }
732                let count = u32::from_le_bytes(body[4..8].try_into().expect("body.len() >= 8 for msg_container")) as usize;
733                let mut pos = 8usize;
734                // Do not early-return: containers may bundle new_session_created + rpc_result
735                // together; all items must be processed so session/salt flags are observed.
736                let mut found: Option<Vec<u8>> = None;
737                for _ in 0..count {
738                    if pos + 16 > body.len() { break; }
739                    let inner_bytes =
740                        u32::from_le_bytes(body[pos + 12..pos + 16].try_into().expect("pos+16 <= body.len() checked above")) as usize;
741                    pos += 16;
742                    if pos + inner_bytes > body.len() { break; }
743                    let inner = &body[pos..pos + inner_bytes];
744                    pos += inner_bytes;
745                    if found.is_none() {
746                        if let Some(r) = Self::scan_body(inner, salt, need_resend,
747                            need_session_reset, bad_msg_code, bad_msg_server_id, sent_msg_id,
748                            server_msg_id)?
749                        {
750                            found = Some(r);
751                            // Do NOT return  - continue processing remaining items so that
752                            // session/salt flags from co-arriving messages are observed.
753                        }
754                    } else {
755                        // Result already captured; still process remaining items for
756                        // side-effect flags (salt, session reset, bad_msg). Pass
757                        // sent_msg_id so the req_msg_id guard still filters stale
758                        // rpc_results. Passing None would bypass the guard and allow
759                        // a stale response to overwrite `found` on the next iteration.
760                        let _ = Self::scan_body(inner, salt, need_resend, need_session_reset,
761                                                bad_msg_code, bad_msg_server_id, sent_msg_id,
762                                                server_msg_id)?;
763                    }
764                }
765                Ok(found)
766            }
767            0x3072cfa1 /* gzip_packed */ => {
768                // Decompress and recurse: server wraps large responses in gzip_packed.
769                if let Some(compressed) = ferogram_connect::tl_read_bytes(&body[4..])
770                    && let Ok(decompressed) = ferogram_connect::gz_inflate(&compressed)
771                    && !decompressed.is_empty()
772                {
773                    return Self::scan_body(
774                        &decompressed, salt,
775                        need_resend, need_session_reset,
776                        bad_msg_code, bad_msg_server_id,
777                        sent_msg_id,
778                        server_msg_id,
779                    );
780                }
781                Ok(None)
782            }
783            _ => Ok(None),
784        }
785    }
786
787    /// Like `rpc_call` but accepts any `Serializable` type (not just `RemoteCall`).
788    pub async fn rpc_call_serializable<S: ferogram_tl_types::Serializable>(
789        &mut self,
790        req: &S,
791    ) -> Result<Vec<u8>, InvocationError> {
792        if !self.pending_acks.is_empty() {
793            let ack_body = build_msgs_ack_body(&self.pending_acks);
794            let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
795            let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
796            self.pending_acks.clear();
797        }
798        let (wire, mut sent_msg_id) = self.enc.pack_serializable_with_msg_id(req);
799        Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
800        let mut salt_retries = 0u8;
801        let mut session_resets = 0u8;
802        loop {
803            let mut raw = Self::recv_abridged(&mut self.stream, &mut self.frame_kind).await?;
804            let msg = self
805                .enc
806                .unpack(&mut raw)
807                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
808            self.pending_acks.push(msg.msg_id);
809            if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
810                let ack_body = build_msgs_ack_body(&self.pending_acks);
811                let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
812                let _ =
813                    Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
814                self.pending_acks.clear();
815            }
816            // Salt updated only on explicit bad_server_salt, not on every message.
817            if msg.body.len() < 4 {
818                return Ok(msg.body);
819            }
820            let mut need_resend = false;
821            let mut need_session_reset = false;
822            let mut bad_msg_code: Option<u32> = None;
823            let mut bad_msg_server_id: Option<i64> = None;
824            // Save result before handling flags; apply all before returning.
825            let scan_result = Self::scan_body(
826                &msg.body,
827                &mut self.enc.salt,
828                &mut need_resend,
829                &mut need_session_reset,
830                &mut bad_msg_code,
831                &mut bad_msg_server_id,
832                Some(sent_msg_id),
833                msg.msg_id,
834            )?;
835            if need_session_reset {
836                session_resets += 1;
837                if session_resets > 2 {
838                    return Err(InvocationError::Deserialize(
839                        "new_session_created (serializable): exceeded 2 resets".into(),
840                    ));
841                }
842                if !self.pending_acks.is_empty() {
843                    let ack_body = build_msgs_ack_body(&self.pending_acks);
844                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
845                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
846                        .await;
847                    self.pending_acks.clear();
848                }
849                if scan_result.is_none() {
850                    let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
851                    sent_msg_id = new_id;
852                    Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
853                }
854            } else if need_resend {
855                match bad_msg_code {
856                    Some(16) | Some(17) => {
857                        if let Some(srv_id) = bad_msg_server_id {
858                            self.enc.correct_time_offset(srv_id);
859                        }
860                        // Do not call undo_seq_no (see rpc_call for explanation).
861                    }
862                    Some(32) | Some(33) => {
863                        self.enc
864                            .correct_seq_no(bad_msg_code.expect("matched Some arm"));
865                    }
866                    _ => {
867                        self.enc.undo_seq_no();
868                    }
869                }
870                salt_retries += 1;
871                if salt_retries >= 5 {
872                    return Err(InvocationError::Deserialize(
873                        "bad_server_salt (serializable): exceeded 5 retries".into(),
874                    ));
875                }
876                tracing::debug!(
877                    "[ferogram::sender] resending serializable request after bad_msg correction (code={bad_msg_code:?}, attempt {salt_retries}/5)"
878                );
879                if !self.pending_acks.is_empty() {
880                    let ack_body = build_msgs_ack_body(&self.pending_acks);
881                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
882                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
883                        .await;
884                    self.pending_acks.clear();
885                }
886                let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
887                sent_msg_id = new_id;
888                Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
889            }
890            if let Some(result) = scan_result {
891                return Ok(result);
892            }
893        }
894    }
895
896    /// Send pre-serialized raw bytes and receive the raw response.
897    /// Used by CDN download connections (no MTProto encryption layer).
898    pub async fn rpc_call_raw(&mut self, body: &[u8]) -> Result<Vec<u8>, InvocationError> {
899        Self::send_abridged(&mut self.stream, body, &mut self.frame_kind).await?;
900        Self::recv_abridged(&mut self.stream, &mut self.frame_kind).await
901    }
902
903    /// Send a framed message using the active FrameKind.
904    /// All transport variants (Abridged, Intermediate, Full, Obfuscated, …) are handled.
905    async fn send_abridged(
906        stream: &mut TcpStream,
907        data: &[u8],
908        kind: &mut FrameKind,
909    ) -> Result<(), InvocationError> {
910        use tokio::io::AsyncWriteExt as _;
911        match kind {
912            FrameKind::Abridged => {
913                let words = data.len() / 4;
914                let mut frame = if words < 0x7f {
915                    let mut v = Vec::with_capacity(1 + data.len());
916                    v.push(words as u8);
917                    v
918                } else {
919                    let mut v = Vec::with_capacity(4 + data.len());
920                    v.extend_from_slice(&[
921                        0x7f,
922                        (words & 0xff) as u8,
923                        ((words >> 8) & 0xff) as u8,
924                        ((words >> 16) & 0xff) as u8,
925                    ]);
926                    v
927                };
928                frame.extend_from_slice(data);
929                stream.write_all(&frame).await?;
930            }
931            FrameKind::Intermediate => {
932                let mut frame = Vec::with_capacity(4 + data.len());
933                frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
934                frame.extend_from_slice(data);
935                stream.write_all(&frame).await?;
936            }
937            FrameKind::Full { send_seqno, .. } => {
938                let seq = send_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
939                let total_len = (data.len() as u32) + 12;
940                let mut packet = Vec::with_capacity(total_len as usize);
941                packet.extend_from_slice(&total_len.to_le_bytes());
942                packet.extend_from_slice(&seq.to_le_bytes());
943                packet.extend_from_slice(data);
944                let crc = ferogram_connect::crc32_ieee(&packet);
945                packet.extend_from_slice(&crc.to_le_bytes());
946                stream.write_all(&packet).await?;
947            }
948            FrameKind::Obfuscated { cipher } => {
949                let words = data.len() / 4;
950                let mut frame = if words < 0x7f {
951                    let mut v = Vec::with_capacity(1 + data.len());
952                    v.push(words as u8);
953                    v
954                } else {
955                    let mut v = Vec::with_capacity(4 + data.len());
956                    v.extend_from_slice(&[
957                        0x7f,
958                        (words & 0xff) as u8,
959                        ((words >> 8) & 0xff) as u8,
960                        ((words >> 16) & 0xff) as u8,
961                    ]);
962                    v
963                };
964                frame.extend_from_slice(data);
965                cipher.lock().await.encrypt(&mut frame);
966                stream.write_all(&frame).await?;
967            }
968            FrameKind::PaddedIntermediate { cipher } => {
969                let mut pad_len_buf = [0u8; 1];
970                ferogram_crypto::fill_random(&mut pad_len_buf);
971                let pad_len = (pad_len_buf[0] & 0x0f) as usize;
972                let total_payload = data.len() + pad_len;
973                let mut frame = Vec::with_capacity(4 + total_payload);
974                frame.extend_from_slice(&(total_payload as u32).to_le_bytes());
975                frame.extend_from_slice(data);
976                let mut pad = vec![0u8; pad_len];
977                ferogram_crypto::fill_random(&mut pad);
978                frame.extend_from_slice(&pad);
979                cipher.lock().await.encrypt(&mut frame);
980                stream.write_all(&frame).await?;
981            }
982            FrameKind::FakeTls { cipher } => {
983                const TLS_APP_DATA: u8 = 0x17;
984                const TLS_VER: [u8; 2] = [0x03, 0x03];
985                const CHUNK: usize = 2878;
986                let mut locked = cipher.lock().await;
987                for chunk in data.chunks(CHUNK) {
988                    let chunk_len = chunk.len() as u16;
989                    let mut record = Vec::with_capacity(5 + chunk.len());
990                    record.push(TLS_APP_DATA);
991                    record.extend_from_slice(&TLS_VER);
992                    record.extend_from_slice(&chunk_len.to_be_bytes());
993                    record.extend_from_slice(chunk);
994                    locked.encrypt(&mut record[5..]);
995                    stream.write_all(&record).await?;
996                }
997            }
998        }
999        Ok(())
1000    }
1001
1002    /// Receive a framed message using the active FrameKind (with 60-second timeout).
1003    async fn recv_abridged(
1004        stream: &mut TcpStream,
1005        kind: &mut FrameKind,
1006    ) -> Result<Vec<u8>, InvocationError> {
1007        use tokio::time::{Duration, timeout};
1008        const RECV_TIMEOUT: Duration = Duration::from_secs(60);
1009
1010        macro_rules! tread {
1011            ($buf:expr) => {
1012                timeout(RECV_TIMEOUT, stream.read_exact($buf))
1013                    .await
1014                    .map_err(|_| {
1015                        InvocationError::Io(std::io::Error::new(
1016                            std::io::ErrorKind::TimedOut,
1017                            "transfer recv: timeout (60 s)",
1018                        ))
1019                    })??
1020            };
1021        }
1022
1023        match kind {
1024            FrameKind::Abridged => {
1025                let mut h = [0u8; 1];
1026                tread!(&mut h);
1027                let words = if h[0] == 0x7f {
1028                    let mut b = [0u8; 3];
1029                    tread!(&mut b);
1030                    let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
1031                    if w == 1 {
1032                        let mut code_buf = [0u8; 4];
1033                        tread!(&mut code_buf);
1034                        let code = i32::from_le_bytes(code_buf);
1035                        return Err(InvocationError::Rpc(
1036                            crate::errors::RpcError::from_telegram(code, "transport error"),
1037                        ));
1038                    }
1039                    w
1040                } else {
1041                    h[0] as usize
1042                };
1043                let mut buf = vec![0u8; words * 4];
1044                tread!(&mut buf);
1045                if buf.len() == 4 {
1046                    let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
1047                    if code < 0 {
1048                        return Err(InvocationError::Rpc(
1049                            crate::errors::RpcError::from_telegram(code, "transport error"),
1050                        ));
1051                    }
1052                }
1053                Ok(buf)
1054            }
1055            FrameKind::Intermediate => {
1056                let mut len_buf = [0u8; 4];
1057                tread!(&mut len_buf);
1058                let len_i32 = i32::from_le_bytes(len_buf);
1059                if len_i32 < 0 {
1060                    return Err(InvocationError::Rpc(
1061                        crate::errors::RpcError::from_telegram(len_i32, "transport error"),
1062                    ));
1063                }
1064                let mut buf = vec![0u8; len_i32 as usize];
1065                tread!(&mut buf);
1066                Ok(buf)
1067            }
1068            FrameKind::Full { recv_seqno, .. } => {
1069                let mut len_buf = [0u8; 4];
1070                tread!(&mut len_buf);
1071                let total_len_i32 = i32::from_le_bytes(len_buf);
1072                if total_len_i32 < 0 {
1073                    return Err(InvocationError::Rpc(
1074                        crate::errors::RpcError::from_telegram(total_len_i32, "transport error"),
1075                    ));
1076                }
1077                let total_len = total_len_i32 as usize;
1078                if total_len < 12 {
1079                    return Err(InvocationError::Deserialize(
1080                        "Full transport: packet too short".into(),
1081                    ));
1082                }
1083                let mut rest = vec![0u8; total_len - 4];
1084                tread!(&mut rest);
1085                let (body, crc_bytes) = rest.split_at(rest.len() - 4);
1086                let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
1087                let mut check_input = Vec::with_capacity(4 + body.len());
1088                check_input.extend_from_slice(&len_buf);
1089                check_input.extend_from_slice(body);
1090                let actual_crc = ferogram_connect::crc32_ieee(&check_input);
1091                if actual_crc != expected_crc {
1092                    return Err(InvocationError::Deserialize(format!(
1093                        "Full transport: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
1094                    )));
1095                }
1096                let recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
1097                let expected_seq = recv_seqno.load(std::sync::atomic::Ordering::Relaxed);
1098                if recv_seq != expected_seq {
1099                    return Err(InvocationError::Deserialize(format!(
1100                        "Full transport: seqno mismatch (got {recv_seq}, expected {expected_seq})"
1101                    )));
1102                }
1103                recv_seqno.store(
1104                    expected_seq.wrapping_add(1),
1105                    std::sync::atomic::Ordering::Relaxed,
1106                );
1107                Ok(body[4..].to_vec())
1108            }
1109            FrameKind::Obfuscated { cipher } => {
1110                let mut h = [0u8; 1];
1111                tread!(&mut h);
1112                cipher.lock().await.decrypt(&mut h);
1113                let words = if h[0] == 0x7f {
1114                    let mut b = [0u8; 3];
1115                    tread!(&mut b);
1116                    cipher.lock().await.decrypt(&mut b);
1117                    let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
1118                    if w == 1 {
1119                        let mut code_buf = [0u8; 4];
1120                        tread!(&mut code_buf);
1121                        cipher.lock().await.decrypt(&mut code_buf);
1122                        let code = i32::from_le_bytes(code_buf);
1123                        return Err(InvocationError::Rpc(
1124                            crate::errors::RpcError::from_telegram(code, "transport error"),
1125                        ));
1126                    }
1127                    w
1128                } else {
1129                    h[0] as usize
1130                };
1131                let mut buf = vec![0u8; words * 4];
1132                tread!(&mut buf);
1133                cipher.lock().await.decrypt(&mut buf);
1134                if buf.len() == 4 {
1135                    let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
1136                    if code < 0 {
1137                        return Err(InvocationError::Rpc(
1138                            crate::errors::RpcError::from_telegram(code, "transport error"),
1139                        ));
1140                    }
1141                }
1142                Ok(buf)
1143            }
1144            FrameKind::PaddedIntermediate { cipher } => {
1145                let mut len_buf = [0u8; 4];
1146                tread!(&mut len_buf);
1147                cipher.lock().await.decrypt(&mut len_buf);
1148                let total_len = i32::from_le_bytes(len_buf);
1149                if total_len < 0 {
1150                    return Err(InvocationError::Rpc(
1151                        crate::errors::RpcError::from_telegram(total_len, "transport error"),
1152                    ));
1153                }
1154                let mut buf = vec![0u8; total_len as usize];
1155                tread!(&mut buf);
1156                cipher.lock().await.decrypt(&mut buf);
1157                if buf.len() >= 24 {
1158                    let pad = (buf.len() - 24) % 16;
1159                    buf.truncate(buf.len() - pad);
1160                }
1161                Ok(buf)
1162            }
1163            FrameKind::FakeTls { cipher } => {
1164                let mut hdr = [0u8; 5];
1165                tread!(&mut hdr);
1166                if hdr[0] != 0x17 {
1167                    return Err(InvocationError::Deserialize(format!(
1168                        "FakeTLS: unexpected record type 0x{:02x}",
1169                        hdr[0]
1170                    )));
1171                }
1172                let payload_len = u16::from_be_bytes([hdr[3], hdr[4]]) as usize;
1173                let mut buf = vec![0u8; payload_len];
1174                tread!(&mut buf);
1175                cipher.lock().await.decrypt(&mut buf);
1176                Ok(buf)
1177            }
1178        }
1179    }
1180
1181    /// Send a plaintext (DH handshake) frame, padding to 4-byte alignment for
1182    /// abridged-family transports. Full and Intermediate don't need padding.
1183    async fn send_plain_frame(
1184        stream: &mut TcpStream,
1185        data: &[u8],
1186        kind: &mut FrameKind,
1187    ) -> Result<(), InvocationError> {
1188        // Abridged/Obfuscated use word-count (len/4); must be 4-byte aligned.
1189        // Full and Intermediate carry the exact byte length so no padding needed.
1190        let needs_align = matches!(kind, FrameKind::Abridged | FrameKind::Obfuscated { .. });
1191        if needs_align && !data.len().is_multiple_of(4) {
1192            let mut padded = data.to_vec();
1193            let pad = 4 - (data.len() % 4);
1194            padded.resize(data.len() + pad, 0);
1195            Self::send_abridged(stream, &padded, kind).await
1196        } else {
1197            Self::send_abridged(stream, data, kind).await
1198        }
1199    }
1200
1201    async fn recv_plain_frame<T: Deserializable>(
1202        stream: &mut TcpStream,
1203        kind: &mut FrameKind,
1204    ) -> Result<T, InvocationError> {
1205        let raw = Self::recv_abridged(stream, kind).await?;
1206        if raw.len() == 4 {
1207            let code = i32::from_le_bytes(raw[..4].try_into().unwrap());
1208            if code < 0 {
1209                return Err(InvocationError::Deserialize(format!(
1210                    "server transport error during DH: code {code}"
1211                )));
1212            }
1213        }
1214        if raw.len() < 20 {
1215            return Err(InvocationError::Deserialize("plain frame too short".into()));
1216        }
1217        if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
1218            return Err(InvocationError::Deserialize(
1219                "expected auth_key_id=0 in plaintext".into(),
1220            ));
1221        }
1222        let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
1223        if raw.len() < 20 + body_len {
1224            return Err(InvocationError::Deserialize(format!(
1225                "plain frame truncated: have {} bytes, need {}",
1226                raw.len(),
1227                20 + body_len
1228            )));
1229        }
1230        let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
1231        T::deserialize(&mut cur).map_err(Into::into)
1232    }
1233}