Skip to main content

ferogram_mtsender/
sender.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2//
3// ferogram: async Telegram MTProto client in Rust
4// https://github.com/ankit-chaubey/ferogram
5//
6// Licensed under either the MIT License or the Apache License 2.0.
7// See the LICENSE-MIT or LICENSE-APACHE file in this repository:
8// https://github.com/ankit-chaubey/ferogram
9//
10// Feel free to use, modify, and share this code.
11// Please keep this notice when redistributing.
12
13use ferogram_connect::FrameKind;
14use ferogram_mtproto::{
15    EncryptedSession, SeenMsgIds, Session, authentication as auth, new_seen_msg_ids, step2_temp,
16};
17use ferogram_tl_types as tl;
18use ferogram_tl_types::{Cursor, Deserializable, RemoteCall};
19use tokio::io::AsyncReadExt;
20use tokio::net::TcpStream;
21
22use crate::errors::InvocationError;
23use crate::pool::{build_msgs_ack_body, build_msgs_ack_ping_body};
24use ferogram_connect::TransportKind;
25// metrics and tracing
26#[allow(unused_imports)]
27use metrics::{counter, histogram};
28
29/// A single encrypted connection to one Telegram DC.
30/// Un-acked server msg_ids to accumulate before eagerly flushing a `msgs_ack` frame.
31const PENDING_ACKS_THRESHOLD: usize = 10;
32
33/// `PingDelayDisconnect` interval for worker connections (in GetFile chunks).
34/// Keeps the socket alive within Telegram's 75-second idle-disconnect window.
35const PING_EVERY_N_CHUNKS: u32 = 5;
36
37pub struct DcConnection {
38    stream: TcpStream,
39    enc: EncryptedSession,
40    pending_acks: Vec<i64>,
41    call_count: u32,
42    /// Active framing kind for this connection.
43    frame_kind: FrameKind,
44    /// Persistent dedup ring that outlives individual EncryptedSessions.
45    #[allow(dead_code)]
46    seen_msg_ids: SeenMsgIds,
47}
48
49impl DcConnection {
50    /// Race Obfuscated / Abridged / Http transports and return the first to succeed.
51    #[tracing::instrument(skip(socks5), fields(addr = %addr, dc_id = dc_id))]
52    pub async fn connect_fastest(
53        addr: &str,
54        socks5: Option<&ferogram_connect::Socks5Config>,
55        dc_id: i16,
56    ) -> Result<(Self, &'static str), InvocationError> {
57        use tokio::task::JoinSet;
58        let addr = addr.to_owned();
59        let socks5 = socks5.cloned();
60        tracing::debug!(
61            "[ferogram::sender] probing {addr} with Full, Obfuscated, and Abridged transports in parallel"
62        );
63        let mut set: JoinSet<Result<(DcConnection, &'static str), InvocationError>> =
64            JoinSet::new();
65
66        {
67            let a = addr.clone();
68            let s = socks5.clone();
69            set.spawn(async move {
70                Ok((
71                    DcConnection::connect_raw(&a, s.as_ref(), &TransportKind::Full, dc_id).await?,
72                    "Full",
73                ))
74            });
75        }
76        {
77            let a = addr.clone();
78            let s = socks5.clone();
79            set.spawn(async move {
80                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
81                Ok((
82                    DcConnection::connect_raw(
83                        &a,
84                        s.as_ref(),
85                        &TransportKind::Obfuscated { secret: None },
86                        dc_id,
87                    )
88                    .await?,
89                    "Obfuscated",
90                ))
91            });
92        }
93        {
94            let a = addr.clone();
95            let s = socks5.clone();
96            set.spawn(async move {
97                tokio::time::sleep(std::time::Duration::from_millis(400)).await;
98                Ok((
99                    DcConnection::connect_raw(&a, s.as_ref(), &TransportKind::Abridged, dc_id)
100                        .await?,
101                    "Abridged",
102                ))
103            });
104        }
105        {
106            let a = addr.clone();
107            set.spawn(async move {
108                tokio::time::sleep(std::time::Duration::from_millis(800)).await;
109                Ok((
110                    DcConnection::connect_raw(&a, None, &TransportKind::Http, dc_id).await?,
111                    "Http",
112                ))
113            });
114        }
115
116        let mut last_err = InvocationError::Deserialize("connect_fastest: no candidates".into());
117        while let Some(outcome) = set.join_next().await {
118            match outcome {
119                Ok(Ok((conn, label))) => {
120                    set.abort_all();
121                    return Ok((conn, label));
122                }
123                Ok(Err(e)) => {
124                    last_err = e;
125                }
126                Err(e) if e.is_cancelled() => {}
127                Err(_) => {}
128            }
129        }
130        Err(last_err)
131    }
132
133    /// Connect and perform full DH handshake.
134    #[tracing::instrument(skip(socks5, transport), fields(addr = %addr, dc_id = dc_id))]
135    pub async fn connect_raw(
136        addr: &str,
137        socks5: Option<&ferogram_connect::Socks5Config>,
138        transport: &TransportKind,
139        dc_id: i16,
140    ) -> Result<Self, InvocationError> {
141        tracing::debug!("[ferogram::sender] connecting to {addr} with known auth key");
142        let (stream, frame_kind, enc) =
143            ferogram_connect::connect_to_dc(addr, dc_id, transport, socks5, None).await?;
144
145        tracing::debug!("[ferogram::sender] DH complete, auth key established for {addr}");
146        let seen = new_seen_msg_ids();
147        Ok(Self {
148            stream,
149            frame_kind,
150            enc: EncryptedSession::with_seen(
151                enc.auth_key_bytes(),
152                enc.salt,
153                enc.time_offset,
154                seen.clone(),
155            ),
156            pending_acks: Vec::new(),
157            call_count: 0,
158            seen_msg_ids: seen,
159        })
160    }
161
162    /// Connect with an already-known auth key (no DH needed).
163    /// If `pfs` is true, performs a temp-key DH bind before any RPCs.
164    #[allow(clippy::too_many_arguments)]
165    pub async fn connect_with_key(
166        addr: &str,
167        auth_key: [u8; 256],
168        first_salt: i64,
169        time_offset: i32,
170        socks5: Option<&ferogram_connect::Socks5Config>,
171        mtproxy: Option<&ferogram_connect::MtProxyConfig>,
172        transport: &TransportKind,
173        dc_id: i16,
174        pfs: bool,
175    ) -> Result<Self, InvocationError> {
176        // ferogram-connect owns TCP open + keepalive + transport init.
177        let (mut stream, mut frame_kind) =
178            ferogram_connect::Connection::open_stream_pub(addr, dc_id, transport, socks5, mtproxy)
179                .await?;
180
181        if pfs {
182            tracing::debug!("[ferogram::sender] PFS: binding temporary key for DC{dc_id}");
183            match Self::do_pool_pfs_bind(&mut stream, &mut frame_kind, &auth_key, dc_id).await {
184                Ok(temp_enc) => {
185                    tracing::debug!("[ferogram::sender] PFS: temporary key bound for DC{dc_id}");
186                    return Ok(Self {
187                        stream,
188                        frame_kind,
189                        enc: temp_enc,
190                        pending_acks: Vec::new(),
191                        call_count: 0,
192                        seen_msg_ids: new_seen_msg_ids(),
193                    });
194                }
195                Err(e) => {
196                    tracing::warn!(
197                        "[ferogram::sender] PFS bind failed for DC{dc_id} ({e}); using permanent key"
198                    );
199                    return Err(e);
200                }
201            }
202        }
203
204        let seen = new_seen_msg_ids();
205        Ok(Self {
206            stream,
207            frame_kind,
208            enc: EncryptedSession::with_seen(auth_key, first_salt, time_offset, seen.clone()),
209            pending_acks: Vec::new(),
210            call_count: 0,
211            seen_msg_ids: seen,
212        })
213    }
214
215    /// Temp-key DH handshake + auth.bindTempAuthKey on an existing stream.
216    async fn do_pool_pfs_bind(
217        stream: &mut tokio::net::TcpStream,
218        kind: &mut FrameKind,
219        perm_auth_key: &[u8; 256],
220        dc_id: i16,
221    ) -> Result<EncryptedSession, InvocationError> {
222        use ferogram_mtproto::{
223            auth_key_id_from_key, encrypt_bind_inner, gen_msg_id, new_seen_msg_ids,
224            serialize_bind_temp_auth_key,
225        };
226        const TEMP_EXPIRES: i32 = 86_400; // 24 h
227
228        // temp-key DH
229        let mut plain = Session::new();
230
231        let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
232        Self::send_plain_frame(stream, &plain.pack(&req1).to_plaintext_bytes(), kind).await?;
233        let res_pq: tl::enums::ResPq = Self::recv_plain_frame(stream, kind).await?;
234
235        let (req2, s2) = step2_temp(s1, res_pq, dc_id as i32, TEMP_EXPIRES)
236            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
237        Self::send_plain_frame(stream, &plain.pack(&req2).to_plaintext_bytes(), kind).await?;
238        let dh: tl::enums::ServerDhParams = Self::recv_plain_frame(stream, kind).await?;
239
240        let (req3, s3) =
241            auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
242        Self::send_plain_frame(stream, &plain.pack(&req3).to_plaintext_bytes(), kind).await?;
243        let ans: tl::enums::SetClientDhParamsAnswer = Self::recv_plain_frame(stream, kind).await?;
244
245        let done = {
246            let mut result =
247                auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
248            let mut attempts = 0u8;
249            loop {
250                match result {
251                    ferogram_mtproto::FinishResult::Done(d) => break d,
252                    ferogram_mtproto::FinishResult::Retry {
253                        retry_id,
254                        dh_params,
255                        nonce,
256                        server_nonce,
257                        new_nonce,
258                    } => {
259                        attempts += 1;
260                        if attempts >= 5 {
261                            return Err(InvocationError::Deserialize(
262                                "PFS pool temp DH retry exceeded 5".into(),
263                            ));
264                        }
265                        let (rr, s3r) = ferogram_mtproto::retry_step3(
266                            &dh_params,
267                            nonce,
268                            server_nonce,
269                            new_nonce,
270                            retry_id,
271                        )
272                        .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
273                        Self::send_plain_frame(stream, &plain.pack(&rr).to_plaintext_bytes(), kind)
274                            .await?;
275                        let ar: tl::enums::SetClientDhParamsAnswer =
276                            Self::recv_plain_frame(stream, kind).await?;
277                        result = auth::finish(s3r, ar)
278                            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
279                    }
280                }
281            }
282        };
283
284        let temp_key = done.auth_key;
285        let temp_salt = done.first_salt;
286        let temp_offset = done.time_offset;
287
288        // build bindTempAuthKey body
289        let temp_key_id = auth_key_id_from_key(&temp_key);
290        let perm_key_id = auth_key_id_from_key(perm_auth_key);
291
292        let mut nonce_buf = [0u8; 8];
293        ferogram_crypto::fill_random(&mut nonce_buf);
294        let nonce = i64::from_le_bytes(nonce_buf);
295
296        let server_now = std::time::SystemTime::now()
297            .duration_since(std::time::UNIX_EPOCH)
298            .expect("system clock is before UNIX epoch")
299            .as_secs() as i32
300            + temp_offset;
301        let expires_at = server_now + TEMP_EXPIRES;
302
303        let seen = new_seen_msg_ids();
304        let mut temp_enc = EncryptedSession::with_seen(temp_key, temp_salt, temp_offset, seen);
305        let temp_session_id = temp_enc.session_id();
306
307        let msg_id = gen_msg_id();
308        let enc_msg = encrypt_bind_inner(
309            perm_auth_key,
310            msg_id,
311            nonce,
312            temp_key_id,
313            perm_key_id,
314            temp_session_id,
315            expires_at,
316        );
317        let bind_body = serialize_bind_temp_auth_key(perm_key_id, nonce, expires_at, &enc_msg);
318
319        // send encrypted bind request
320        let wire = temp_enc.pack_body_at_msg_id(&bind_body, msg_id);
321        Self::send_abridged(stream, &wire, kind).await?;
322
323        // Receive and verify response.
324        // The server may send informational frames first (msgs_ack, new_session_created)
325        // before the actual rpc_result{boolTrue}, so we loop up to 5 frames.
326        for attempt in 0u8..5 {
327            let mut raw = Self::recv_abridged(stream, kind).await?;
328            let decrypted = temp_enc.unpack(&mut raw).map_err(|e| {
329                InvocationError::Deserialize(format!("PFS pool bind decrypt: {e:?}"))
330            })?;
331            match ferogram_connect::decode_bind_response(&decrypted.body) {
332                Ok(()) => {
333                    // bindTempAuthKey succeeds under the temp key; keep the session
334                    // sequence as-is so subsequent RPCs continue from the same MTProto
335                    // message stream.
336                    return Ok(temp_enc);
337                }
338                Err(ref e) if e == "__need_more__" => {
339                    tracing::debug!(
340                        "[ferogram::sender] PFS (DC{dc_id}): got informational frame on attempt {attempt}, reading next"
341                    );
342                    continue;
343                }
344                Err(reason) => {
345                    tracing::error!(
346                        "[ferogram::sender] PFS bind rejected by server for DC{dc_id}: {reason}"
347                    );
348                    return Err(InvocationError::Deserialize(format!(
349                        "auth.bindTempAuthKey (pool): {reason}"
350                    )));
351                }
352            }
353        }
354        Err(InvocationError::Deserialize(
355            "auth.bindTempAuthKey (pool): no boolTrue after 5 frames".into(),
356        ))
357    }
358
359    pub fn auth_key_bytes(&self) -> [u8; 256] {
360        self.enc.auth_key_bytes()
361    }
362    pub fn first_salt(&self) -> i64 {
363        self.enc.salt
364    }
365    pub fn time_offset(&self) -> i32 {
366        self.enc.time_offset
367    }
368
369    /// Break the connection into its raw parts so the caller can hand them to
370    /// the pipelined [`crate::sender_task::spawn_sender_task`] instead of using
371    /// this struct's blocking [`rpc_call`](Self::rpc_call).
372    ///
373    /// `DcPool` uses this once a connection has finished its setup (DH, PFS
374    /// bind, initConnection) as a plain `DcConnection`: the pool graduates it
375    /// into a background sender task so every later request on the pool's
376    /// `invoke_on_dc` path pipelines instead of blocking the connection for
377    /// each round trip.
378    pub(crate) fn into_parts(self) -> (TcpStream, FrameKind, EncryptedSession) {
379        (self.stream, self.frame_kind, self.enc)
380    }
381
382    #[tracing::instrument(skip(self, req), fields(method = std::any::type_name::<R>()))]
383    pub async fn rpc_call<R: RemoteCall>(&mut self, req: &R) -> Result<Vec<u8>, InvocationError> {
384        let _t0 = std::time::Instant::now();
385        // Periodic PingDelayDisconnect: sent before the request to piggyback on
386        // the same TCP write window.  Keeps the socket alive across the download.
387        self.call_count += 1;
388        if self.call_count.is_multiple_of(PING_EVERY_N_CHUNKS) {
389            let ping_id = self.call_count as i64;
390            let ping_body = build_msgs_ack_ping_body(ping_id);
391            // PingDelayDisconnect is content-related (returns Pong): must use odd seq_no.
392            let (ping_wire, _) = self.enc.pack_body_with_msg_id(&ping_body, true);
393            // This ping is fire-and-forget. The Pong response is a content-related
394            // server message and must be acknowledged. If the RPC result arrives before
395            // the Pong, the Pong's msg_id is never added to pending_acks. On idle
396            // connections (no subsequent RPCs) the un-acked Pong will eventually cause
397            // Telegram to close the connection. A dedicated always-running reader task
398            // that drains and acks all server messages would fix this permanently; for
399            // now the next rpc_call iteration receives and acks the Pong via pending_acks.
400            let _ = Self::send_abridged(&mut self.stream, &ping_wire, &mut self.frame_kind).await;
401        }
402
403        // Flush pending acks.
404        if !self.pending_acks.is_empty() {
405            let ack_body = build_msgs_ack_body(&self.pending_acks);
406            let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
407            let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
408            self.pending_acks.clear();
409        }
410
411        // Track sent msg_id to verify rpc_result.req_msg_id and discard stale responses.
412        let (wire, mut sent_msg_id) = self.enc.pack_with_msg_id(req);
413        Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
414        let mut salt_retries = 0u8;
415        let mut session_resets = 0u8;
416        loop {
417            let mut raw = Self::recv_abridged(&mut self.stream, &mut self.frame_kind).await?;
418            let msg = self
419                .enc
420                .unpack(&mut raw)
421                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
422            // Track every received msg_id for acknowledgement.
423            self.pending_acks.push(msg.msg_id);
424            if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
425                // Eager flush: too many un-acked messages  - Telegram will close the
426                // connection if we don't ack within its window.
427                let ack_body = build_msgs_ack_body(&self.pending_acks);
428                let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
429                let _ =
430                    Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
431                self.pending_acks.clear();
432            }
433            // Salt is updated only on explicit bad_server_salt, not on every message.
434            if msg.body.len() < 4 {
435                return Ok(msg.body);
436            }
437            let mut need_resend = false;
438            let mut need_session_reset = false;
439            let mut bad_msg_code: Option<u32> = None;
440            let mut bad_msg_server_id: Option<i64> = None;
441            // Process all flags before returning: containers may carry
442            // new_session_created + rpc_result together.
443            let scan_result = Self::scan_body(
444                &msg.body,
445                &mut self.enc.salt,
446                &mut need_resend,
447                &mut need_session_reset,
448                &mut bad_msg_code,
449                &mut bad_msg_server_id,
450                Some(sent_msg_id),
451                msg.msg_id,
452            )?;
453            // new_session_created requires seq_no reset to 0.
454            if need_session_reset {
455                session_resets += 1;
456                if session_resets > 2 {
457                    return Err(InvocationError::Deserialize(
458                        "new_session_created: exceeded 2 resets".into(),
459                    ));
460                }
461                if !self.pending_acks.is_empty() {
462                    let ack_body = build_msgs_ack_body(&self.pending_acks);
463                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
464                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
465                        .await;
466                    self.pending_acks.clear();
467                }
468                // Keep the current session sequence. new_session_created updates the
469                // server salt and may require resending stale requests, but it does
470                // not require zeroing the local MTProto seq counter.
471                if scan_result.is_none() {
472                    // No result yet; resend using the current MTProto sequence.
473                    tracing::debug!(
474                        "[ferogram::sender] new_session_created: resending request (attempt {session_resets}/2)"
475                    );
476                    let (wire, new_id) = self.enc.pack_with_msg_id(req);
477                    sent_msg_id = new_id;
478                    Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
479                }
480                // If scan_result.is_some(), the result arrived in the same container
481                // as new_session_created; session has been reset for future calls,
482                // fall through to return the result.
483            } else if need_resend {
484                // Apply seq_no / time corrections from bad_msg_notification.
485                match bad_msg_code {
486                    Some(16) | Some(17) => {
487                        if let Some(srv_id) = bad_msg_server_id {
488                            self.enc.correct_time_offset(srv_id);
489                        }
490                        // Do not call undo_seq_no here. Reusing the same seq_no on a
491                        // retry violates MTProto monotonicity; the server may reject
492                        // with code 32. Let the next pack_with_msg_id assign the next
493                        // available odd seq_no for the resent message.
494                    }
495                    Some(32) | Some(33) => {
496                        // correct_seq_no does a full session reset (new session_id,
497                        // seq_no=0) instead of magic +/- offsets.
498                        self.enc
499                            .correct_seq_no(bad_msg_code.expect("matched Some arm"));
500                    }
501                    _ => {
502                        // bad_server_salt or bad_msg code 48
503                        self.enc.undo_seq_no();
504                    }
505                }
506                salt_retries += 1;
507                if salt_retries >= 5 {
508                    return Err(InvocationError::Deserialize(
509                        "bad_server_salt/bad_msg: exceeded 5 retries".into(),
510                    ));
511                }
512                tracing::debug!(
513                    "[ferogram::sender] resending transfer request after bad_msg correction (code={bad_msg_code:?}, attempt {salt_retries}/5)"
514                );
515                if !self.pending_acks.is_empty() {
516                    let ack_body = build_msgs_ack_body(&self.pending_acks);
517                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
518                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
519                        .await;
520                    self.pending_acks.clear();
521                }
522                let (wire, new_id) = self.enc.pack_with_msg_id(req);
523                sent_msg_id = new_id;
524                Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
525            }
526            if let Some(result) = scan_result {
527                metrics::counter!("ferogram.rpc_calls_total", "result" => "ok").increment(1);
528                metrics::histogram!("ferogram.rpc_latency_ms")
529                    .record(_t0.elapsed().as_millis() as f64);
530                return Ok(result);
531            }
532        }
533    }
534    ///
535    /// Returns `Ok(Some(bytes))` when rpc_result is found.
536    /// Returns `Ok(None)` for informational messages (continue reading).
537    /// Returns `Err` for rpc_error or parse failures.
538    ///
539    /// Output flags:
540    /// - `need_resend`: set for bad_server_salt / bad_msg_notification (codes 16/17/32/33/48)
541    /// - `need_session_reset`: set for new_session_created (seq_no must reset to 0)
542    /// - `bad_msg_code`: error_code from bad_msg_notification for caller to apply correction
543    /// - `bad_msg_server_id`: server msg_id for time-offset correction (codes 16/17)
544    /// - `server_msg_id`: outer frame msg_id for time-offset correction (codes 16/17).
545    ///   Must be msg.msg_id from the caller, not bad_msg_id (client clock, not server's).
546    #[allow(clippy::too_many_arguments)]
547    fn scan_body(
548        body: &[u8],
549        salt: &mut i64,
550        need_resend: &mut bool,
551        need_session_reset: &mut bool,
552        bad_msg_code: &mut Option<u32>,
553        bad_msg_server_id: &mut Option<i64>,
554        sent_msg_id: Option<i64>,
555        server_msg_id: i64,
556    ) -> Result<Option<Vec<u8>>, InvocationError> {
557        if body.len() < 4 {
558            return Ok(None);
559        }
560        let cid = u32::from_le_bytes(body[..4].try_into().expect("body.len() >= 4 checked above"));
561        match cid {
562            0xf35c6d01 /* rpc_result: CID(4) + req_msg_id(8) + result */ => {
563                if body.len() >= 12
564                    && let Some(expected) = sent_msg_id {
565                        let resp_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 12 checked above"));
566                        if resp_id != expected {
567                            tracing::debug!(
568                                "[ferogram::sender] rpc_result msg_id mismatch (got {resp_id:#018x}, want {expected:#018x}); skipping this frame"
569                            );
570                            return Ok(None);
571                        }
572                    }
573                let inner = if body.len() >= 12 { &body[12..] } else { body };
574                // Inner body may itself be gzip_packed (e.g. help.Config inside rpc_result).
575                if inner.len() >= 4
576                    && u32::from_le_bytes(inner[..4].try_into().expect("inner.len() >= 4 checked above")) == 0x3072cfa1
577                {
578                    let mut dummy_salt = *salt;
579                    let mut nr = false; let mut nsr = false;
580                    let mut bc = None; let mut bsi = None;
581                    if let Some(r) = Self::scan_body(inner, &mut dummy_salt, &mut nr, &mut nsr, &mut bc, &mut bsi, None, server_msg_id)? {
582                        return Ok(Some(r));
583                    }
584                    // Unwrap the gzip directly and return the decompressed bytes.
585                    if let Some(compressed) = ferogram_connect::tl_read_bytes(&inner[4..])
586                        && let Ok(out) = ferogram_connect::gz_inflate(&compressed)
587                    {
588                        return Ok(Some(out));
589                    }
590                    return Ok(None);
591                }
592                if inner.len() >= 8
593                    && u32::from_le_bytes(inner[..4].try_into().expect("inner.len() >= 8 checked above")) == 0x2144ca19
594                {
595                    let code = i32::from_le_bytes(inner[4..8].try_into().expect("inner.len() >= 8 checked above"));
596                    let message = ferogram_connect::tl_read_string(&inner[8..]).unwrap_or_default();
597                    return Err(InvocationError::Rpc(
598                        crate::errors::RpcError::from_telegram(code, &message),
599                    ));
600                }
601                Ok(Some(inner.to_vec()))
602            }
603            0x2144ca19 /* rpc_error */ => {
604                if body.len() < 8 {
605                    return Err(InvocationError::Deserialize("rpc_error short".into()));
606                }
607                let code = i32::from_le_bytes(body[4..8].try_into().expect("body.len() >= 8 checked above"));
608                let message = ferogram_connect::tl_read_string(&body[8..]).unwrap_or_default();
609                Err(InvocationError::Rpc(crate::errors::RpcError::from_telegram(code, &message)))
610            }
611            0xedab447b /* bad_server_salt */ => {
612                // bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int error_code:int new_server_salt:long
613                if body.len() >= 28 {
614                    let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 28 checked above"));
615                    let new_salt   = i64::from_le_bytes(body[20..28].try_into().expect("body.len() >= 28 checked above"));
616                    // Only apply new salt when bad_msg_id matches our sent request;
617                    // stale frames from prior requests must not corrupt the current salt.
618                    if sent_msg_id.is_none_or(|id| id == bad_msg_id) {
619                        *salt = new_salt;
620                        *need_resend = true;
621                    }
622                }
623                Ok(None)
624            }
625            0x9ec20908 /* new_session_created */ => {
626                // new_session_created#9ec20908 first_msg_id:long unique_id:long server_salt:long
627                // Signal need_session_reset so the caller resets seq_no before resending.
628                if body.len() >= 28 {
629                    let first_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 28 checked above"));
630                    let unique_id    = i64::from_le_bytes(body[12..20].try_into().expect("body.len() >= 28 checked above"));
631                    let server_salt  = i64::from_le_bytes(body[20..28].try_into().expect("body.len() >= 28 checked above"));
632                    tracing::debug!(
633                        unique_id = format_args!("{unique_id:#018x}"),
634                        first_msg_id,
635                        salt = server_salt,
636                        "[ferogram::sender] new_session_created: server opened fresh session"
637                    );
638                    *salt = server_salt;
639                    // Only reset if the pending request predates the server's new session.
640                    // If sent_msg_id == first_msg_id (fresh worker conn on first send),
641                    // the server will reply with our current session_id. Unconditionally
642                    // calling reset_session() here changes the id, causing the response
643                    // decrypt to fail with session_id mismatch.
644                    if sent_msg_id.is_some_and(|id| id < first_msg_id) {
645                        *need_session_reset = true;
646                    }
647                }
648                Ok(None)
649            }
650            0xa7eff811 /* bad_msg_notification */ => {
651                // bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int error_code:int
652                //
653                // TL layout: body[4..12]=bad_msg_id, body[12..16]=bad_msg_seqno,
654                // body[16..20]=error_code. Previous code read [12..16] as error_code
655                // (bad_msg_seqno), so error matching always compared the wrong field.
656                if body.len() >= 20 {
657                    let bad_msg_id  = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 20 checked above"));
658                    // body[12..16] = bad_msg_seqno, not used for recovery.
659                    let error_code  = u32::from_le_bytes(body[16..20].try_into().expect("body.len() >= 20 checked above"));
660                    tracing::debug!(
661                        bad_msg_id = format_args!("{bad_msg_id:#018x}"),
662                        error_code,
663                        "[ferogram::sender] bad_msg_notification received"
664                    );
665                    match error_code {
666                        16 | 17 => {
667                            // msg_id too low/high: time-offset correction needed.
668                            // server_msg_id upper 32 bits = server Unix timestamp.
669                            // bad_msg_id carries the client's clock, not the server's.
670                            *bad_msg_code = Some(error_code);
671                            *bad_msg_server_id = Some(server_msg_id);
672                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
673                        }
674                        32 | 33 => {
675                            // seq_no wrong.
676                            *bad_msg_code = Some(error_code);
677                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
678                        }
679                        48 => {
680                            // bad_msg code 48 = incorrect server salt. Per spec, this
681                            // arrives together with a bad_server_salt frame in the same
682                            // container that carries the new salt. If bad_server_salt was
683                            // already processed, *salt is updated and the resend uses the
684                            // correct value. If not (partial container), resend once
685                            // conservatively; the retry loop's 5-attempt cap prevents a loop.
686                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
687                            tracing::debug!(
688                                "[ferogram::sender] bad_msg code 48 (wrong server salt): will resend with updated salt"
689                            );
690                        }
691                        _ => {
692                            // Unknown code; resend to avoid the loop stalling.
693                            *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
694                        }
695                    }
696                }
697                Ok(None)
698            }
699            0x347773c5 /* pong */ => {
700                // Pong is returned for both internal PingDelayDisconnect (fire-and-forget)
701                // and user-invoked Ping (which has a pending invoke future waiting).
702                // pong layout: CID(4) + msg_id(8) + ping_id(8)
703                // pong.msg_id is the msg_id of the original ping request.
704                // Route back to the caller when it matches the pending sent_msg_id.
705                if body.len() >= 12
706                    && let Some(expected) = sent_msg_id
707                {
708                    let pong_req_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 12 for pong"));
709                    if pong_req_id == expected {
710                        return Ok(Some(body.to_vec()));
711                    }
712                }
713                // Internal keepalive pong - discard.
714                Ok(None)
715            }
716            0x73f1f8dc /* msg_container */ => {
717                if body.len() < 8 {
718                    return Ok(None);
719                }
720                let count = u32::from_le_bytes(body[4..8].try_into().expect("body.len() >= 8 for msg_container")) as usize;
721                let mut pos = 8usize;
722                // Do not early-return: containers may bundle new_session_created + rpc_result
723                // together; all items must be processed so session/salt flags are observed.
724                let mut found: Option<Vec<u8>> = None;
725                for _ in 0..count {
726                    if pos + 16 > body.len() { break; }
727                    let inner_bytes =
728                        u32::from_le_bytes(body[pos + 12..pos + 16].try_into().expect("pos+16 <= body.len() checked above")) as usize;
729                    pos += 16;
730                    if pos + inner_bytes > body.len() { break; }
731                    let inner = &body[pos..pos + inner_bytes];
732                    pos += inner_bytes;
733                    if found.is_none() {
734                        if let Some(r) = Self::scan_body(inner, salt, need_resend,
735                            need_session_reset, bad_msg_code, bad_msg_server_id, sent_msg_id,
736                            server_msg_id)?
737                        {
738                            found = Some(r);
739                            // Do NOT return  - continue processing remaining items so that
740                            // session/salt flags from co-arriving messages are observed.
741                        }
742                    } else {
743                        // Result already captured; still process remaining items for
744                        // side-effect flags (salt, session reset, bad_msg). Pass
745                        // sent_msg_id so the req_msg_id guard still filters stale
746                        // rpc_results. Passing None would bypass the guard and allow
747                        // a stale response to overwrite `found` on the next iteration.
748                        let _ = Self::scan_body(inner, salt, need_resend, need_session_reset,
749                                                bad_msg_code, bad_msg_server_id, sent_msg_id,
750                                                server_msg_id)?;
751                    }
752                }
753                Ok(found)
754            }
755            0x3072cfa1 /* gzip_packed */ => {
756                // Decompress and recurse: server wraps large responses in gzip_packed.
757                if let Some(compressed) = ferogram_connect::tl_read_bytes(&body[4..])
758                    && let Ok(decompressed) = ferogram_connect::gz_inflate(&compressed)
759                    && !decompressed.is_empty()
760                {
761                    return Self::scan_body(
762                        &decompressed, salt,
763                        need_resend, need_session_reset,
764                        bad_msg_code, bad_msg_server_id,
765                        sent_msg_id,
766                        server_msg_id,
767                    );
768                }
769                Ok(None)
770            }
771            _ => Ok(None),
772        }
773    }
774
775    /// Like `rpc_call` but accepts any `Serializable` type (not just `RemoteCall`).
776    pub async fn rpc_call_serializable<S: ferogram_tl_types::Serializable>(
777        &mut self,
778        req: &S,
779    ) -> Result<Vec<u8>, InvocationError> {
780        if !self.pending_acks.is_empty() {
781            let ack_body = build_msgs_ack_body(&self.pending_acks);
782            let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
783            let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
784            self.pending_acks.clear();
785        }
786        let (wire, mut sent_msg_id) = self.enc.pack_serializable_with_msg_id(req);
787        Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
788        let mut salt_retries = 0u8;
789        let mut session_resets = 0u8;
790        loop {
791            let mut raw = Self::recv_abridged(&mut self.stream, &mut self.frame_kind).await?;
792            let msg = self
793                .enc
794                .unpack(&mut raw)
795                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
796            self.pending_acks.push(msg.msg_id);
797            if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
798                let ack_body = build_msgs_ack_body(&self.pending_acks);
799                let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
800                let _ =
801                    Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
802                self.pending_acks.clear();
803            }
804            // Salt updated only on explicit bad_server_salt, not on every message.
805            if msg.body.len() < 4 {
806                return Ok(msg.body);
807            }
808            let mut need_resend = false;
809            let mut need_session_reset = false;
810            let mut bad_msg_code: Option<u32> = None;
811            let mut bad_msg_server_id: Option<i64> = None;
812            // Save result before handling flags; apply all before returning.
813            let scan_result = Self::scan_body(
814                &msg.body,
815                &mut self.enc.salt,
816                &mut need_resend,
817                &mut need_session_reset,
818                &mut bad_msg_code,
819                &mut bad_msg_server_id,
820                Some(sent_msg_id),
821                msg.msg_id,
822            )?;
823            if need_session_reset {
824                session_resets += 1;
825                if session_resets > 2 {
826                    return Err(InvocationError::Deserialize(
827                        "new_session_created (serializable): exceeded 2 resets".into(),
828                    ));
829                }
830                if !self.pending_acks.is_empty() {
831                    let ack_body = build_msgs_ack_body(&self.pending_acks);
832                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
833                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
834                        .await;
835                    self.pending_acks.clear();
836                }
837                if scan_result.is_none() {
838                    let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
839                    sent_msg_id = new_id;
840                    Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
841                }
842            } else if need_resend {
843                match bad_msg_code {
844                    Some(16) | Some(17) => {
845                        if let Some(srv_id) = bad_msg_server_id {
846                            self.enc.correct_time_offset(srv_id);
847                        }
848                        // Do not call undo_seq_no (see rpc_call for explanation).
849                    }
850                    Some(32) | Some(33) => {
851                        self.enc
852                            .correct_seq_no(bad_msg_code.expect("matched Some arm"));
853                    }
854                    _ => {
855                        self.enc.undo_seq_no();
856                    }
857                }
858                salt_retries += 1;
859                if salt_retries >= 5 {
860                    return Err(InvocationError::Deserialize(
861                        "bad_server_salt (serializable): exceeded 5 retries".into(),
862                    ));
863                }
864                tracing::debug!(
865                    "[ferogram::sender] resending serializable request after bad_msg correction (code={bad_msg_code:?}, attempt {salt_retries}/5)"
866                );
867                if !self.pending_acks.is_empty() {
868                    let ack_body = build_msgs_ack_body(&self.pending_acks);
869                    let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
870                    let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
871                        .await;
872                    self.pending_acks.clear();
873                }
874                let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
875                sent_msg_id = new_id;
876                Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
877            }
878            if let Some(result) = scan_result {
879                return Ok(result);
880            }
881        }
882    }
883
884    /// Send pre-serialized raw bytes and receive the raw response.
885    /// Used by CDN download connections (no MTProto encryption layer).
886    pub async fn rpc_call_raw(&mut self, body: &[u8]) -> Result<Vec<u8>, InvocationError> {
887        Self::send_abridged(&mut self.stream, body, &mut self.frame_kind).await?;
888        Self::recv_abridged(&mut self.stream, &mut self.frame_kind).await
889    }
890
891    /// Send a framed message using the active FrameKind.
892    /// All transport variants (Abridged, Intermediate, Full, Obfuscated, …) are handled.
893    async fn send_abridged(
894        stream: &mut TcpStream,
895        data: &[u8],
896        kind: &mut FrameKind,
897    ) -> Result<(), InvocationError> {
898        use tokio::io::AsyncWriteExt as _;
899        match kind {
900            FrameKind::Abridged => {
901                let words = data.len() / 4;
902                let mut frame = if words < 0x7f {
903                    let mut v = Vec::with_capacity(1 + data.len());
904                    v.push(words as u8);
905                    v
906                } else {
907                    let mut v = Vec::with_capacity(4 + data.len());
908                    v.extend_from_slice(&[
909                        0x7f,
910                        (words & 0xff) as u8,
911                        ((words >> 8) & 0xff) as u8,
912                        ((words >> 16) & 0xff) as u8,
913                    ]);
914                    v
915                };
916                frame.extend_from_slice(data);
917                stream.write_all(&frame).await?;
918            }
919            FrameKind::Intermediate => {
920                let mut frame = Vec::with_capacity(4 + data.len());
921                frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
922                frame.extend_from_slice(data);
923                stream.write_all(&frame).await?;
924            }
925            FrameKind::Full { send_seqno, .. } => {
926                let seq = send_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
927                let total_len = (data.len() as u32) + 12;
928                let mut packet = Vec::with_capacity(total_len as usize);
929                packet.extend_from_slice(&total_len.to_le_bytes());
930                packet.extend_from_slice(&seq.to_le_bytes());
931                packet.extend_from_slice(data);
932                let crc = ferogram_connect::crc32_ieee(&packet);
933                packet.extend_from_slice(&crc.to_le_bytes());
934                stream.write_all(&packet).await?;
935            }
936            FrameKind::Obfuscated { cipher } => {
937                let words = data.len() / 4;
938                let mut frame = if words < 0x7f {
939                    let mut v = Vec::with_capacity(1 + data.len());
940                    v.push(words as u8);
941                    v
942                } else {
943                    let mut v = Vec::with_capacity(4 + data.len());
944                    v.extend_from_slice(&[
945                        0x7f,
946                        (words & 0xff) as u8,
947                        ((words >> 8) & 0xff) as u8,
948                        ((words >> 16) & 0xff) as u8,
949                    ]);
950                    v
951                };
952                frame.extend_from_slice(data);
953                cipher.lock().await.encrypt(&mut frame);
954                stream.write_all(&frame).await?;
955            }
956            FrameKind::PaddedIntermediate { cipher } => {
957                let mut pad_len_buf = [0u8; 1];
958                ferogram_crypto::fill_random(&mut pad_len_buf);
959                let pad_len = (pad_len_buf[0] & 0x0f) as usize;
960                let total_payload = data.len() + pad_len;
961                let mut frame = Vec::with_capacity(4 + total_payload);
962                frame.extend_from_slice(&(total_payload as u32).to_le_bytes());
963                frame.extend_from_slice(data);
964                let mut pad = vec![0u8; pad_len];
965                ferogram_crypto::fill_random(&mut pad);
966                frame.extend_from_slice(&pad);
967                cipher.lock().await.encrypt(&mut frame);
968                stream.write_all(&frame).await?;
969            }
970            FrameKind::FakeTls { cipher } => {
971                const TLS_APP_DATA: u8 = 0x17;
972                const TLS_VER: [u8; 2] = [0x03, 0x03];
973                const CHUNK: usize = 2878;
974                let mut locked = cipher.lock().await;
975                for chunk in data.chunks(CHUNK) {
976                    let chunk_len = chunk.len() as u16;
977                    let mut record = Vec::with_capacity(5 + chunk.len());
978                    record.push(TLS_APP_DATA);
979                    record.extend_from_slice(&TLS_VER);
980                    record.extend_from_slice(&chunk_len.to_be_bytes());
981                    record.extend_from_slice(chunk);
982                    locked.encrypt(&mut record[5..]);
983                    stream.write_all(&record).await?;
984                }
985            }
986        }
987        Ok(())
988    }
989
990    /// Receive a framed message using the active FrameKind (with 60-second timeout).
991    async fn recv_abridged(
992        stream: &mut TcpStream,
993        kind: &mut FrameKind,
994    ) -> Result<Vec<u8>, InvocationError> {
995        use tokio::time::{Duration, timeout};
996        const RECV_TIMEOUT: Duration = Duration::from_secs(60);
997
998        macro_rules! tread {
999            ($buf:expr) => {
1000                timeout(RECV_TIMEOUT, stream.read_exact($buf))
1001                    .await
1002                    .map_err(|_| {
1003                        InvocationError::Io(std::io::Error::new(
1004                            std::io::ErrorKind::TimedOut,
1005                            "transfer recv: timeout (60 s)",
1006                        ))
1007                    })??
1008            };
1009        }
1010
1011        match kind {
1012            FrameKind::Abridged => {
1013                let mut h = [0u8; 1];
1014                tread!(&mut h);
1015                let words = if h[0] == 0x7f {
1016                    let mut b = [0u8; 3];
1017                    tread!(&mut b);
1018                    let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
1019                    if w == 1 {
1020                        let mut code_buf = [0u8; 4];
1021                        tread!(&mut code_buf);
1022                        let code = i32::from_le_bytes(code_buf);
1023                        return Err(InvocationError::Rpc(
1024                            crate::errors::RpcError::from_telegram(code, "transport error"),
1025                        ));
1026                    }
1027                    w
1028                } else {
1029                    h[0] as usize
1030                };
1031                let mut buf = vec![0u8; words * 4];
1032                tread!(&mut buf);
1033                if buf.len() == 4 {
1034                    let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
1035                    if code < 0 {
1036                        return Err(InvocationError::Rpc(
1037                            crate::errors::RpcError::from_telegram(code, "transport error"),
1038                        ));
1039                    }
1040                }
1041                Ok(buf)
1042            }
1043            FrameKind::Intermediate => {
1044                let mut len_buf = [0u8; 4];
1045                tread!(&mut len_buf);
1046                let len_i32 = i32::from_le_bytes(len_buf);
1047                if len_i32 < 0 {
1048                    return Err(InvocationError::Rpc(
1049                        crate::errors::RpcError::from_telegram(len_i32, "transport error"),
1050                    ));
1051                }
1052                let mut buf = vec![0u8; len_i32 as usize];
1053                tread!(&mut buf);
1054                Ok(buf)
1055            }
1056            FrameKind::Full { recv_seqno, .. } => {
1057                let mut len_buf = [0u8; 4];
1058                tread!(&mut len_buf);
1059                let total_len_i32 = i32::from_le_bytes(len_buf);
1060                if total_len_i32 < 0 {
1061                    return Err(InvocationError::Rpc(
1062                        crate::errors::RpcError::from_telegram(total_len_i32, "transport error"),
1063                    ));
1064                }
1065                let total_len = total_len_i32 as usize;
1066                if total_len < 12 {
1067                    return Err(InvocationError::Deserialize(
1068                        "Full transport: packet too short".into(),
1069                    ));
1070                }
1071                let mut rest = vec![0u8; total_len - 4];
1072                tread!(&mut rest);
1073                let (body, crc_bytes) = rest.split_at(rest.len() - 4);
1074                let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
1075                let mut check_input = Vec::with_capacity(4 + body.len());
1076                check_input.extend_from_slice(&len_buf);
1077                check_input.extend_from_slice(body);
1078                let actual_crc = ferogram_connect::crc32_ieee(&check_input);
1079                if actual_crc != expected_crc {
1080                    return Err(InvocationError::Deserialize(format!(
1081                        "Full transport: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
1082                    )));
1083                }
1084                let recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
1085                let expected_seq = recv_seqno.load(std::sync::atomic::Ordering::Relaxed);
1086                if recv_seq != expected_seq {
1087                    return Err(InvocationError::Deserialize(format!(
1088                        "Full transport: seqno mismatch (got {recv_seq}, expected {expected_seq})"
1089                    )));
1090                }
1091                recv_seqno.store(
1092                    expected_seq.wrapping_add(1),
1093                    std::sync::atomic::Ordering::Relaxed,
1094                );
1095                Ok(body[4..].to_vec())
1096            }
1097            FrameKind::Obfuscated { cipher } => {
1098                let mut h = [0u8; 1];
1099                tread!(&mut h);
1100                cipher.lock().await.decrypt(&mut h);
1101                let words = if h[0] == 0x7f {
1102                    let mut b = [0u8; 3];
1103                    tread!(&mut b);
1104                    cipher.lock().await.decrypt(&mut b);
1105                    let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
1106                    if w == 1 {
1107                        let mut code_buf = [0u8; 4];
1108                        tread!(&mut code_buf);
1109                        cipher.lock().await.decrypt(&mut code_buf);
1110                        let code = i32::from_le_bytes(code_buf);
1111                        return Err(InvocationError::Rpc(
1112                            crate::errors::RpcError::from_telegram(code, "transport error"),
1113                        ));
1114                    }
1115                    w
1116                } else {
1117                    h[0] as usize
1118                };
1119                let mut buf = vec![0u8; words * 4];
1120                tread!(&mut buf);
1121                cipher.lock().await.decrypt(&mut buf);
1122                if buf.len() == 4 {
1123                    let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
1124                    if code < 0 {
1125                        return Err(InvocationError::Rpc(
1126                            crate::errors::RpcError::from_telegram(code, "transport error"),
1127                        ));
1128                    }
1129                }
1130                Ok(buf)
1131            }
1132            FrameKind::PaddedIntermediate { cipher } => {
1133                let mut len_buf = [0u8; 4];
1134                tread!(&mut len_buf);
1135                cipher.lock().await.decrypt(&mut len_buf);
1136                let total_len = i32::from_le_bytes(len_buf);
1137                if total_len < 0 {
1138                    return Err(InvocationError::Rpc(
1139                        crate::errors::RpcError::from_telegram(total_len, "transport error"),
1140                    ));
1141                }
1142                let mut buf = vec![0u8; total_len as usize];
1143                tread!(&mut buf);
1144                cipher.lock().await.decrypt(&mut buf);
1145                if buf.len() >= 24 {
1146                    let pad = (buf.len() - 24) % 16;
1147                    buf.truncate(buf.len() - pad);
1148                }
1149                Ok(buf)
1150            }
1151            FrameKind::FakeTls { cipher } => {
1152                let mut hdr = [0u8; 5];
1153                tread!(&mut hdr);
1154                if hdr[0] != 0x17 {
1155                    return Err(InvocationError::Deserialize(format!(
1156                        "FakeTLS: unexpected record type 0x{:02x}",
1157                        hdr[0]
1158                    )));
1159                }
1160                let payload_len = u16::from_be_bytes([hdr[3], hdr[4]]) as usize;
1161                let mut buf = vec![0u8; payload_len];
1162                tread!(&mut buf);
1163                cipher.lock().await.decrypt(&mut buf);
1164                Ok(buf)
1165            }
1166        }
1167    }
1168
1169    /// Send a plaintext (DH handshake) frame, padding to 4-byte alignment for
1170    /// abridged-family transports. Full and Intermediate don't need padding.
1171    async fn send_plain_frame(
1172        stream: &mut TcpStream,
1173        data: &[u8],
1174        kind: &mut FrameKind,
1175    ) -> Result<(), InvocationError> {
1176        // Abridged/Obfuscated use word-count (len/4); must be 4-byte aligned.
1177        // Full and Intermediate carry the exact byte length so no padding needed.
1178        let needs_align = matches!(kind, FrameKind::Abridged | FrameKind::Obfuscated { .. });
1179        if needs_align && !data.len().is_multiple_of(4) {
1180            let mut padded = data.to_vec();
1181            let pad = 4 - (data.len() % 4);
1182            padded.resize(data.len() + pad, 0);
1183            Self::send_abridged(stream, &padded, kind).await
1184        } else {
1185            Self::send_abridged(stream, data, kind).await
1186        }
1187    }
1188
1189    async fn recv_plain_frame<T: Deserializable>(
1190        stream: &mut TcpStream,
1191        kind: &mut FrameKind,
1192    ) -> Result<T, InvocationError> {
1193        let raw = Self::recv_abridged(stream, kind).await?;
1194        if raw.len() == 4 {
1195            let code = i32::from_le_bytes(raw[..4].try_into().unwrap());
1196            if code < 0 {
1197                return Err(InvocationError::Deserialize(format!(
1198                    "server transport error during DH: code {code}"
1199                )));
1200            }
1201        }
1202        if raw.len() < 20 {
1203            return Err(InvocationError::Deserialize("plain frame too short".into()));
1204        }
1205        if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
1206            return Err(InvocationError::Deserialize(
1207                "expected auth_key_id=0 in plaintext".into(),
1208            ));
1209        }
1210        let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
1211        if raw.len() < 20 + body_len {
1212            return Err(InvocationError::Deserialize(format!(
1213                "plain frame truncated: have {} bytes, need {}",
1214                raw.len(),
1215                20 + body_len
1216            )));
1217        }
1218        let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
1219        T::deserialize(&mut cur).map_err(Into::into)
1220    }
1221}