Skip to main content

layer_client/
lib.rs

1#![doc(html_root_url = "https://docs.rs/layer-client/0.4.0")]
2//! # layer-client
3//!
4//! Production-grade async Telegram client built on MTProto.
5//!
6//! ## Features
7//! - User login (phone + code + 2FA SRP) and bot token login
8//! - Peer access-hash caching — API calls always carry correct access hashes
9//! - `FLOOD_WAIT` auto-retry with configurable policy
10//! - Typed async update stream: `NewMessage`, `MessageEdited`, `MessageDeleted`,
11//!   `CallbackQuery`, `InlineQuery`, `InlineSend`, `Raw`
12//! - Send / edit / delete / forward / pin messages
13//! - Search messages (per-chat and global)
14//! - Mark as read, delete dialogs, clear mentions
15//! - Join chat / accept invite links
16//! - Chat action (typing, uploading, …)
17//! - `get_me()` — fetch own User info
18//! - Paginated dialog and message iterators
19//! - DC migration, session persistence, reconnect
20
21#![deny(unsafe_code)]
22
23mod errors;
24mod retry;
25mod session;
26mod transport;
27mod two_factor_auth;
28pub mod update;
29pub mod parsers;
30pub mod media;
31pub mod participants;
32pub mod pts;
33
34// ── New feature modules ───────────────────────────────────────────────────────
35pub mod dc_pool;
36pub mod transport_obfuscated;
37pub mod transport_intermediate;
38pub mod socks5;
39pub mod session_backend;
40pub mod inline_iter;
41pub mod typing_guard;
42pub mod keyboard;
43pub mod search;
44pub mod types;
45
46#[macro_use]
47pub mod macros;
48
49#[cfg(test)]
50mod pts_tests;
51
52pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
53pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
54pub use update::Update;
55pub use media::{UploadedFile, DownloadIter, Photo, Document, Sticker, Downloadable};
56pub use participants::Participant;
57pub use typing_guard::TypingGuard;
58pub use socks5::Socks5Config;
59pub use session_backend::{SessionBackend, BinaryFileBackend, InMemoryBackend, StringSessionBackend};
60pub use keyboard::{Button, InlineKeyboard, ReplyKeyboard};
61pub use search::{SearchBuilder, GlobalSearchBuilder};
62pub use types::{User, Group, Channel, Chat};
63
64/// Re-export of `layer_tl_types` — generated TL constructors, functions, and enums.
65/// Users can write `use layer_client::tl` instead of adding a separate `layer-tl-types` dep.
66pub use layer_tl_types as tl;
67
68use std::collections::HashMap;
69use std::collections::VecDeque;
70use std::num::NonZeroU32;
71use std::ops::ControlFlow;
72use std::sync::Arc;
73use std::time::Duration;
74
75use layer_mtproto::{EncryptedSession, Session, authentication as auth};
76use layer_tl_types::{Cursor, Deserializable, RemoteCall};
77use session::{DcEntry, PersistedSession};
78use tokio::io::{AsyncReadExt, AsyncWriteExt};
79use tokio::net::TcpStream;
80use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
81use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
82use tokio::time::sleep;
83use tokio_util::sync::CancellationToken;
84use socket2::TcpKeepalive;
85
86// ─── MTProto envelope constructor IDs ────────────────────────────────────────
87
88const ID_RPC_RESULT:       u32 = 0xf35c6d01;
89const ID_RPC_ERROR:        u32 = 0x2144ca19;
90const ID_MSG_CONTAINER:    u32 = 0x73f1f8dc;
91const ID_GZIP_PACKED:      u32 = 0x3072cfa1;
92const ID_PONG:             u32 = 0x347773c5;
93const ID_MSGS_ACK:         u32 = 0x62d6b459;
94const ID_BAD_SERVER_SALT:  u32 = 0xedab447b;
95const ID_NEW_SESSION:      u32 = 0x9ec20908;
96const ID_BAD_MSG_NOTIFY:      u32 = 0xa7eff811;
97// G-09: FutureSalts arrives as a bare frame (not inside rpc_result)
98const ID_FUTURE_SALTS:        u32 = 0xae500895;
99// G-11: server confirms our message was received; we must ack its answer_msg_id
100const ID_MSG_DETAILED_INFO:   u32 = 0x276d3ec6;
101const ID_MSG_NEW_DETAIL_INFO: u32 = 0x809db6df;
102// G-14: server asks us to re-send a specific message
103const ID_MSG_RESEND_REQ:      u32 = 0x7d861a08;
104const ID_UPDATES:             u32 = 0x74ae4240;
105const ID_UPDATE_SHORT:     u32 = 0x78d4dec1;
106const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
107const ID_UPDATE_SHORT_MSG:      u32 = 0x313bc7f8;
108const ID_UPDATE_SHORT_CHAT_MSG: u32 = 0x4d6deea5;
109const ID_UPDATES_TOO_LONG:      u32 = 0xe317af7e;
110
111// ─── Keepalive / reconnect tuning ─────────────────────────────────────────────
112
113/// How often to send a keepalive ping.
114/// 60 s matches grammers and Telegram Desktop long-poll cadence.
115/// At 1 M connections this is 4× less keepalive traffic than 15 s.
116const PING_DELAY_SECS: u64 = 60;
117
118/// Tell Telegram to close the connection if it hears nothing for this many
119/// seconds.  Must be > PING_DELAY_SECS so a single missed ping doesn't drop us.
120/// 75 s = 60 s interval + 15 s slack, matching grammers.
121const NO_PING_DISCONNECT: i32 = 75;
122
123/// Initial backoff before the first reconnect attempt.
124const RECONNECT_BASE_MS: u64 = 500;
125
126/// Maximum backoff between reconnect attempts.
127/// 5 s cap instead of 30 s — on mobile, network outages are brief and a 30 s
128/// sleep means the bot stays dead for up to 30 s after the network returns.
129/// Official Telegram mobile clients use a short cap for the same reason.
130const RECONNECT_MAX_SECS: u64 = 5;
131
132/// TCP socket-level keepalive: start probes after this many seconds of idle.
133const TCP_KEEPALIVE_IDLE_SECS: u64 = 10;
134/// Interval between TCP keepalive probes.
135const TCP_KEEPALIVE_INTERVAL_SECS: u64 = 5;
136/// Number of failed probes before the OS declares the connection dead.
137const TCP_KEEPALIVE_PROBES: u32 = 3;
138
139// ─── PeerCache ────────────────────────────────────────────────────────────────
140
141/// Caches access hashes for users and channels so every API call carries the
142/// correct hash without re-resolving peers.
143///
144/// All fields are `pub` so that `save_session` / `connect` can read/write them
145/// directly, and so that advanced callers can inspect the cache.
146#[derive(Default)]
147pub struct PeerCache {
148    /// user_id → access_hash
149    pub users:    HashMap<i64, i64>,
150    /// channel_id → access_hash
151    pub channels: HashMap<i64, i64>,
152}
153
154impl PeerCache {
155    fn cache_user(&mut self, user: &tl::enums::User) {
156        if let tl::enums::User::User(u) = user
157            && let Some(hash) = u.access_hash {
158            self.users.insert(u.id, hash);
159        }
160    }
161
162    fn cache_chat(&mut self, chat: &tl::enums::Chat) {
163        match chat {
164            tl::enums::Chat::Channel(c) => {
165                if let Some(hash) = c.access_hash {
166                    self.channels.insert(c.id, hash);
167                }
168            }
169            tl::enums::Chat::ChannelForbidden(c) => {
170                self.channels.insert(c.id, c.access_hash);
171            }
172            _ => {}
173        }
174    }
175
176    fn cache_users(&mut self, users: &[tl::enums::User]) {
177        for u in users { self.cache_user(u); }
178    }
179
180    fn cache_chats(&mut self, chats: &[tl::enums::Chat]) {
181        for c in chats { self.cache_chat(c); }
182    }
183
184    fn user_input_peer(&self, user_id: i64) -> tl::enums::InputPeer {
185        if user_id == 0 {
186            return tl::enums::InputPeer::PeerSelf;
187        }
188        let hash = self.users.get(&user_id).copied().unwrap_or(0);
189        tl::enums::InputPeer::User(tl::types::InputPeerUser { user_id, access_hash: hash })
190    }
191
192    fn channel_input_peer(&self, channel_id: i64) -> tl::enums::InputPeer {
193        let hash = self.channels.get(&channel_id).copied().unwrap_or(0);
194        tl::enums::InputPeer::Channel(tl::types::InputPeerChannel { channel_id, access_hash: hash })
195    }
196
197    fn peer_to_input(&self, peer: &tl::enums::Peer) -> tl::enums::InputPeer {
198        match peer {
199            tl::enums::Peer::User(u) => self.user_input_peer(u.user_id),
200            tl::enums::Peer::Chat(c) => tl::enums::InputPeer::Chat(
201                tl::types::InputPeerChat { chat_id: c.chat_id }
202            ),
203            tl::enums::Peer::Channel(c) => self.channel_input_peer(c.channel_id),
204        }
205    }
206}
207
208// ─── InputMessage builder ─────────────────────────────────────────────────────
209
210/// Builder for composing outgoing messages.
211///
212/// ```rust,no_run
213/// use layer_client::InputMessage;
214///
215/// let msg = InputMessage::text("Hello, *world*!")
216///     .silent(true)
217///     .reply_to(Some(42));
218/// ```
219#[derive(Clone, Default)]
220pub struct InputMessage {
221    pub text:         String,
222    pub reply_to:     Option<i32>,
223    pub silent:       bool,
224    pub background:   bool,
225    pub clear_draft:  bool,
226    pub no_webpage:   bool,
227    /// Show media above the caption instead of below (Telegram ≥ 10.3).\
228    pub invert_media: bool,
229    /// Schedule to send when the user goes online (`schedule_date = 0x7FFFFFFE`).\
230    pub schedule_once_online: bool,
231    pub entities:     Option<Vec<tl::enums::MessageEntity>>,
232    pub reply_markup: Option<tl::enums::ReplyMarkup>,
233    pub schedule_date: Option<i32>,
234    /// Attached media to send alongside the message (G-45).
235    /// Use [`InputMessage::copy_media`] to attach media copied from an existing message.
236    pub media: Option<tl::enums::InputMedia>,
237}
238
239impl InputMessage {
240    /// Create a message with the given text.
241    pub fn text(text: impl Into<String>) -> Self {
242        Self { text: text.into(), ..Default::default() }
243    }
244
245    /// Set the message text.
246    pub fn set_text(mut self, text: impl Into<String>) -> Self {
247        self.text = text.into(); self
248    }
249
250    /// Reply to a specific message ID.
251    pub fn reply_to(mut self, id: Option<i32>) -> Self {
252        self.reply_to = id; self
253    }
254
255    /// Send silently (no notification sound).
256    pub fn silent(mut self, v: bool) -> Self {
257        self.silent = v; self
258    }
259
260    /// Send in background.
261    pub fn background(mut self, v: bool) -> Self {
262        self.background = v; self
263    }
264
265    /// Clear the draft after sending.
266    pub fn clear_draft(mut self, v: bool) -> Self {
267        self.clear_draft = v; self
268    }
269
270    /// Disable link preview.
271    pub fn no_webpage(mut self, v: bool) -> Self {
272        self.no_webpage = v; self
273    }
274
275    /// Show media above the caption rather than below (requires Telegram ≥ 10.3).
276    pub fn invert_media(mut self, v: bool) -> Self {
277        self.invert_media = v; self
278    }
279
280    /// Schedule the message to be sent when the recipient comes online.
281    ///
282    /// Mutually exclusive with `schedule_date` — calling this last wins.
283    /// Uses the Telegram magic value `0x7FFFFFFE`.
284    pub fn schedule_once_online(mut self) -> Self {
285        self.schedule_once_online = true;
286        self.schedule_date = None;
287        self
288    }
289
290    /// Attach formatting entities (bold, italic, code, links, etc).
291    pub fn entities(mut self, e: Vec<tl::enums::MessageEntity>) -> Self {
292        self.entities = Some(e); self
293    }
294
295    /// Attach a reply markup (inline or reply keyboard).
296    pub fn reply_markup(mut self, rm: tl::enums::ReplyMarkup) -> Self {
297        self.reply_markup = Some(rm); self
298    }
299
300    /// Shorthand for attaching an [`crate::keyboard::InlineKeyboard`].
301    ///
302    /// ```rust,no_run
303    /// use layer_client::{InputMessage, keyboard::{InlineKeyboard, Button}};
304    ///
305    /// let msg = InputMessage::text("Pick one:")
306    ///     .keyboard(InlineKeyboard::new()
307    ///         .row([Button::callback("A", b"a"), Button::callback("B", b"b")]));
308    /// ```
309    pub fn keyboard(mut self, kb: impl Into<tl::enums::ReplyMarkup>) -> Self {
310        self.reply_markup = Some(kb.into()); self
311    }
312
313    /// Schedule the message for a future Unix timestamp.
314    pub fn schedule_date(mut self, ts: Option<i32>) -> Self {
315        self.schedule_date = ts; self
316    }
317
318    /// Attach media copied from an existing message (G-45).
319    ///
320    /// Pass the `InputMedia` obtained from [`crate::media::Photo`],
321    /// [`crate::media::Document`], or directly from a raw `MessageMedia`.
322    ///
323    /// When a `media` is set, the message is sent via `messages.SendMedia`
324    /// instead of `messages.SendMessage`.
325    ///
326    /// ```rust,no_run
327    /// # use layer_client::{InputMessage, tl};
328    /// # fn example(media: tl::enums::InputMedia) {
329    /// let msg = InputMessage::text("Here is the file again")
330    ///     .copy_media(media);
331    /// # }
332    /// ```
333    pub fn copy_media(mut self, media: tl::enums::InputMedia) -> Self {
334        self.media = Some(media); self
335    }
336
337    /// Remove any previously attached media.
338    pub fn clear_media(mut self) -> Self {
339        self.media = None; self
340    }
341
342    fn reply_header(&self) -> Option<tl::enums::InputReplyTo> {
343        self.reply_to.map(|id| {
344            tl::enums::InputReplyTo::Message(
345                tl::types::InputReplyToMessage {
346                    reply_to_msg_id: id,
347                    top_msg_id: None,
348                    reply_to_peer_id: None,
349                    quote_text: None,
350                    quote_entities: None,
351                    quote_offset: None,
352                    monoforum_peer_id: None,
353                    todo_item_id: None,
354                    poll_option: None,
355                }
356            )
357        })
358    }
359}
360
361impl From<&str> for InputMessage {
362    fn from(s: &str) -> Self { Self::text(s) }
363}
364
365impl From<String> for InputMessage {
366    fn from(s: String) -> Self { Self::text(s) }
367}
368
369// ─── TransportKind ────────────────────────────────────────────────────────────
370
371/// Which MTProto transport framing to use for all connections.
372///
373/// | Variant | Init bytes | Notes |
374/// |---------|-----------|-------|
375/// | `Abridged` | `0xef` | Default, smallest overhead |
376/// | `Intermediate` | `0xeeeeeeee` | Better proxy compat |
377/// | `Full` | none | Adds seqno + CRC32 |
378/// | `Obfuscated` | random 64B | Bypasses DPI / MTProxy |
379#[derive(Clone, Debug, Default)]
380pub enum TransportKind {
381    /// MTProto [Abridged] transport — length prefix is 1 or 4 bytes.
382    ///
383    /// [Abridged]: https://core.telegram.org/mtproto/mtproto-transports#abridged
384    #[default]
385    Abridged,
386    /// MTProto [Intermediate] transport — 4-byte LE length prefix.
387    ///
388    /// [Intermediate]: https://core.telegram.org/mtproto/mtproto-transports#intermediate
389    Intermediate,
390    /// MTProto [Full] transport — 4-byte length + seqno + CRC32.
391    ///
392    /// [Full]: https://core.telegram.org/mtproto/mtproto-transports#full
393    Full,
394    /// [Obfuscated2] transport — XOR stream cipher over Abridged framing.
395    /// Required for MTProxy and networks with deep-packet inspection.
396    ///
397    /// `secret` is the 16-byte proxy secret, or `None` for keyless obfuscation.
398    ///
399    /// [Obfuscated2]: https://core.telegram.org/mtproto/mtproto-transports#obfuscated-2
400    Obfuscated { secret: Option<[u8; 16]> },
401}
402
403// ─── Config ───────────────────────────────────────────────────────────────────
404
405/// A token that can be used to gracefully shut down a [`Client`].
406///
407/// Obtained from [`Client::connect`] — call [`ShutdownToken::cancel`] to begin
408/// graceful shutdown. All pending requests will finish and the reader task will
409/// exit cleanly.
410///
411/// # Example
412/// ```rust,no_run
413/// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
414/// use layer_client::{Client, Config, ShutdownToken};
415///
416/// let (client, shutdown) = Client::connect(Config::default()).await?;
417///
418/// // In a signal handler or background task:
419/// // shutdown.cancel();
420/// # Ok(()) }
421/// ```
422pub type ShutdownToken = CancellationToken;
423
424/// Configuration for [`Client::connect`].
425#[derive(Clone)]
426pub struct Config {
427    pub api_id:         i32,
428    pub api_hash:       String,
429    pub dc_addr:        Option<String>,
430    pub retry_policy:   Arc<dyn RetryPolicy>,
431    /// Optional SOCKS5 proxy — every Telegram connection is tunnelled through it.
432    pub socks5:         Option<crate::socks5::Socks5Config>,
433    /// Allow IPv6 DC addresses when populating the DC table (default: false).
434    pub allow_ipv6:     bool,
435    /// Which MTProto transport framing to use (default: Abridged).
436    pub transport:      TransportKind,
437    /// Session persistence backend (default: binary file `"layer.session"`).
438    pub session_backend: Arc<dyn crate::session_backend::SessionBackend>,
439    /// If `true`, replay missed updates via `updates.getDifference` immediately
440    /// after connecting. Mirrors grammers' `UpdatesConfiguration { catch_up: true }`.
441    /// Default: `false`.
442    pub catch_up:       bool,
443}
444
445impl Config {
446    /// Convenience builder: use a portable base64 string session.
447    ///
448    /// Pass the string exported from a previous `client.export_session_string()` call,
449    /// or an empty string to start fresh (the string session will be populated after auth).
450    ///
451    /// # Example
452    /// ```rust,no_run
453    /// let cfg = Config {
454    ///     api_id:   12345,
455    ///     api_hash: "abc".into(),
456    ///     catch_up: true,
457    ///     ..Config::with_string_session(std::env::var("SESSION").unwrap_or_default())
458    /// };
459    /// ```
460    pub fn with_string_session(s: impl Into<String>) -> Self {
461        Config {
462            session_backend: Arc::new(
463                crate::session_backend::StringSessionBackend::new(s)
464            ),
465            ..Config::default()
466        }
467    }
468}
469
470impl Default for Config {
471    fn default() -> Self {
472        Self {
473            api_id:          0,
474            api_hash:        String::new(),
475            dc_addr:         None,
476            retry_policy:    Arc::new(AutoSleep::default()),
477            socks5:          None,
478            allow_ipv6:      false,
479            transport:       TransportKind::Abridged,
480            session_backend: Arc::new(crate::session_backend::BinaryFileBackend::new("layer.session")),
481            catch_up:        false,
482        }
483    }
484}
485
486// ─── UpdateStream ─────────────────────────────────────────────────────────────
487// G-56: UpdateStream lives here; next_raw() added.
488
489/// Asynchronous stream of [`Update`]s.
490pub struct UpdateStream {
491    rx: mpsc::UnboundedReceiver<update::Update>,
492}
493
494impl UpdateStream {
495    /// Wait for the next update. Returns `None` when the client has disconnected.
496    pub async fn next(&mut self) -> Option<update::Update> {
497        self.rx.recv().await
498    }
499
500    /// Wait for the next **raw** (unrecognised) update frame, skipping all
501    /// typed high-level variants. Useful for handling constructor IDs that
502    /// `layer-client` does not yet wrap — dispatch on `constructor_id` yourself.
503    ///
504    /// Returns `None` when the client has disconnected.
505    pub async fn next_raw(&mut self) -> Option<update::RawUpdate> {
506        loop {
507            match self.rx.recv().await? {
508                update::Update::Raw(r) => return Some(r),
509                _ => continue,
510            }
511        }
512    }
513}
514
515// ─── Dialog ───────────────────────────────────────────────────────────────────
516
517/// A Telegram dialog (chat, user, channel).
518#[derive(Debug, Clone)]
519pub struct Dialog {
520    pub raw:     tl::enums::Dialog,
521    pub message: Option<tl::enums::Message>,
522    pub entity:  Option<tl::enums::User>,
523    pub chat:    Option<tl::enums::Chat>,
524}
525
526impl Dialog {
527    /// The dialog's display title.
528    pub fn title(&self) -> String {
529        if let Some(tl::enums::User::User(u)) = &self.entity {
530            let first = u.first_name.as_deref().unwrap_or("");
531            let last  = u.last_name.as_deref().unwrap_or("");
532            let name  = format!("{first} {last}").trim().to_string();
533            if !name.is_empty() { return name; }
534        }
535        if let Some(chat) = &self.chat {
536            return match chat {
537                tl::enums::Chat::Chat(c)         => c.title.clone(),
538                tl::enums::Chat::Forbidden(c) => c.title.clone(),
539                tl::enums::Chat::Channel(c)      => c.title.clone(),
540                tl::enums::Chat::ChannelForbidden(c) => c.title.clone(),
541                tl::enums::Chat::Empty(_)        => "(empty)".into(),
542            };
543        }
544        "(Unknown)".to_string()
545    }
546
547    /// Peer of this dialog.
548    pub fn peer(&self) -> Option<&tl::enums::Peer> {
549        match &self.raw {
550            tl::enums::Dialog::Dialog(d) => Some(&d.peer),
551            tl::enums::Dialog::Folder(_) => None,
552        }
553    }
554
555    /// Unread message count.
556    pub fn unread_count(&self) -> i32 {
557        match &self.raw {
558            tl::enums::Dialog::Dialog(d) => d.unread_count,
559            _ => 0,
560        }
561    }
562
563    /// ID of the top message.
564    pub fn top_message(&self) -> i32 {
565        match &self.raw {
566            tl::enums::Dialog::Dialog(d) => d.top_message,
567            _ => 0,
568        }
569    }
570}
571
572// ─── ClientInner ─────────────────────────────────────────────────────────────
573
574struct ClientInner {
575    /// Write half of the connection — holds the EncryptedSession (for packing)
576    /// and the send half of the TCP stream. The read half is owned by the
577    /// reader task started in connect().
578    writer:          Mutex<ConnectionWriter>,
579    /// Pending RPC replies, keyed by MTProto msg_id.
580    /// RPC callers insert a oneshot::Sender here before sending; the reader
581    /// task routes incoming rpc_result frames to the matching sender.
582    #[allow(clippy::type_complexity)]
583    pending:         Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>>,
584    /// Channel used to hand a new (OwnedReadHalf, FrameKind, auth_key, session_id)
585    /// to the reader task after a reconnect.
586    reconnect_tx:    mpsc::UnboundedSender<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
587    /// Send `()` here to wake the reader's reconnect backoff loop immediately.
588    /// Used by [`Client::signal_network_restored`].
589    network_hint_tx: mpsc::UnboundedSender<()>,
590    /// Cancelled to signal graceful shutdown to the reader task.
591    #[allow(dead_code)]
592    shutdown_token:  CancellationToken,
593    /// Whether to replay missed updates via getDifference on connect.
594    #[allow(dead_code)]
595    catch_up:        bool,
596    home_dc_id:      Mutex<i32>,
597    dc_options:      Mutex<HashMap<i32, DcEntry>>,
598    pub peer_cache:    RwLock<PeerCache>,
599    pub pts_state:     Mutex<pts::PtsState>,
600    /// G-17: Buffer for updates received during a possible-gap window.
601    pub possible_gap:  Mutex<pts::PossibleGapBuffer>,
602    api_id:          i32,
603    api_hash:        String,
604    retry_policy:    Arc<dyn RetryPolicy>,
605    socks5:          Option<crate::socks5::Socks5Config>,
606    allow_ipv6:      bool,
607    transport:       TransportKind,
608    session_backend: Arc<dyn crate::session_backend::SessionBackend>,
609    dc_pool:         Mutex<dc_pool::DcPool>,
610    update_tx:       mpsc::UnboundedSender<update::Update>,
611}
612
613/// The main Telegram client. Cheap to clone — internally Arc-wrapped.
614#[derive(Clone)]
615pub struct Client {
616    pub(crate) inner: Arc<ClientInner>,
617    _update_rx: Arc<Mutex<mpsc::UnboundedReceiver<update::Update>>>,
618}
619
620impl Client {
621    // ── Connect ────────────────────────────────────────────────────────────
622
623    pub async fn connect(config: Config) -> Result<(Self, ShutdownToken), InvocationError> {
624        let (update_tx, update_rx) = mpsc::unbounded_channel();
625
626        // ── Load or fresh-connect ───────────────────────────────────────
627        let socks5    = config.socks5.clone();
628        let transport = config.transport.clone();
629
630        let (conn, home_dc_id, dc_opts, loaded_session) =
631            match config.session_backend.load()
632                .map_err(InvocationError::Io)?
633            {
634                Some(s) => {
635                    if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
636                        if let Some(key) = dc.auth_key {
637                            log::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
638                            match Connection::connect_with_key(
639                                &dc.addr, key, dc.first_salt, dc.time_offset,
640                                socks5.as_ref(), &transport,
641                            ).await {
642                                Ok(c) => {
643                                    let mut opts = session::default_dc_addresses()
644                                        .into_iter()
645                                        .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
646                                        .collect::<HashMap<_, _>>();
647                                    for d in &s.dcs { opts.insert(d.dc_id, d.clone()); }
648                                    (c, s.home_dc_id, opts, Some(s))
649                                }
650                                Err(e) => {
651                                    log::warn!("[layer] Session connect failed ({e}), fresh connect …");
652                                    let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
653                                    (c, dc, opts, None)
654                                }
655                            }
656                        } else {
657                            let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
658                            (c, dc, opts, None)
659                        }
660                    } else {
661                        let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
662                        (c, dc, opts, None)
663                    }
664                }
665                None => {
666                    let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
667                    (c, dc, opts, None)
668                }
669            };
670
671        // ── Build DC pool ───────────────────────────────────────────────
672        let pool = dc_pool::DcPool::new(home_dc_id, &dc_opts.values().cloned().collect::<Vec<_>>());
673
674        // Split the TCP stream immediately.
675        // The writer (write half + EncryptedSession) stays in ClientInner.
676        // The read half goes to the reader task which we spawn right now so
677        // that RPC calls during init_connection work correctly.
678        let (writer, read_half, frame_kind) = conn.into_writer();
679        let auth_key   = writer.enc.auth_key_bytes();
680        let session_id = writer.enc.session_id();
681
682        #[allow(clippy::type_complexity)]
683        let pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>> =
684            Arc::new(Mutex::new(HashMap::new()));
685
686        // Channel the reconnect logic uses to hand a new read half to the reader task.
687        let (reconnect_tx, reconnect_rx) =
688            mpsc::unbounded_channel::<(OwnedReadHalf, FrameKind, [u8; 256], i64)>();
689
690        // Channel for external "network restored" hints — lets Android/iOS callbacks
691        // skip the reconnect backoff and attempt immediately.
692        let (network_hint_tx, network_hint_rx) = mpsc::unbounded_channel::<()>();
693
694
695        // Graceful shutdown token — cancel this to stop the reader task cleanly.
696        let shutdown_token = CancellationToken::new();
697        let catch_up = config.catch_up;
698
699        let inner = Arc::new(ClientInner {
700            writer:          Mutex::new(writer),
701            pending:         pending.clone(),
702            reconnect_tx,
703            network_hint_tx,
704            shutdown_token:  shutdown_token.clone(),
705            catch_up,
706            home_dc_id:      Mutex::new(home_dc_id),
707            dc_options:      Mutex::new(dc_opts),
708            peer_cache:      RwLock::new(PeerCache::default()),
709            pts_state:       Mutex::new(pts::PtsState::default()),
710            possible_gap:    Mutex::new(pts::PossibleGapBuffer::new()),
711            api_id:          config.api_id,
712            api_hash:        config.api_hash,
713            retry_policy:    config.retry_policy,
714            socks5:          config.socks5,
715            allow_ipv6:      config.allow_ipv6,
716            transport:       config.transport,
717            session_backend: config.session_backend,
718            dc_pool:         Mutex::new(pool),
719            update_tx,
720        });
721
722        let client = Self {
723            inner,
724            _update_rx: Arc::new(Mutex::new(update_rx)),
725        };
726
727        // Spawn the reader task immediately so that RPC calls during
728        // init_connection can receive their responses.
729        {
730            let client_r = client.clone();
731            let shutdown_r = shutdown_token.clone();
732            tokio::spawn(async move {
733                client_r.run_reader_task(
734                    read_half, frame_kind, auth_key, session_id,
735                    reconnect_rx, network_hint_rx, shutdown_r,
736                ).await;
737            });
738        }
739
740        // If init_connection fails (e.g. stale auth key rejected by Telegram),
741        // do a fresh DH handshake and retry once.
742        if let Err(e) = client.init_connection().await {
743            log::warn!("[layer] init_connection failed ({e}), retrying with fresh connect …");
744
745            let socks5_r    = client.inner.socks5.clone();
746            let transport_r = client.inner.transport.clone();
747            let (new_conn, new_dc_id, new_opts) =
748                Self::fresh_connect(socks5_r.as_ref(), &transport_r).await?;
749
750            {
751                let mut dc_guard = client.inner.home_dc_id.lock().await;
752                *dc_guard = new_dc_id;
753            }
754            {
755                let mut opts_guard = client.inner.dc_options.lock().await;
756                *opts_guard = new_opts;
757            }
758
759            // Replace writer and hand new read half to reader task
760            let (new_writer, new_read, new_fk) = new_conn.into_writer();
761            let new_ak = new_writer.enc.auth_key_bytes();
762            let new_sid = new_writer.enc.session_id();
763            *client.inner.writer.lock().await = new_writer;
764            let _ = client.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
765
766            client.init_connection().await?;
767        }
768
769        // ── Restore peer access-hash cache from session ─────────────────
770        if let Some(ref s) = loaded_session
771            && !s.peers.is_empty() {
772            let mut cache = client.inner.peer_cache.write().await;
773            for p in &s.peers {
774                if p.is_channel {
775                    cache.channels.entry(p.id).or_insert(p.access_hash);
776                } else {
777                    cache.users.entry(p.id).or_insert(p.access_hash);
778                }
779            }
780            log::debug!("[layer] Peer cache restored: {} users, {} channels",
781                cache.users.len(), cache.channels.len());
782        }
783
784        // ── Restore update state / catch-up ─────────────────────────────
785        //
786        // Two modes:
787        //   catch_up=false → always call sync_pts_state() so we start from
788        //                    the current server state (ignore saved pts).
789        //   catch_up=true  → if we have a saved pts > 0, restore it and let
790        //                    get_difference() fetch what we missed.  Only fall
791        //                    back to sync_pts_state() when there is no saved
792        //                    state (first boot, or fresh session).
793        let has_saved_state = loaded_session
794            .as_ref()
795            .is_some_and(|s| s.updates_state.is_initialised());
796
797        if catch_up && has_saved_state {
798            let snap = &loaded_session.as_ref().unwrap().updates_state;
799            let mut state = client.inner.pts_state.lock().await;
800            state.pts  = snap.pts;
801            state.qts  = snap.qts;
802            state.date = snap.date;
803            state.seq  = snap.seq;
804            for &(cid, cpts) in &snap.channels {
805                state.channel_pts.insert(cid, cpts);
806            }
807            log::info!("[layer] Update state restored: pts={}, qts={}, seq={}, {} channels",
808                state.pts, state.qts, state.seq, state.channel_pts.len());
809            drop(state);
810
811            // Capture channel list before spawn — get_difference() resets
812            // PtsState via from_server_state (channel_pts preserved now, but
813            // we need the IDs to drive per-channel catch-up regardless).
814            let channel_ids: Vec<i64> = snap.channels.iter().map(|&(cid, _)| cid).collect();
815
816            // Now spawn the catch-up diff — pts is the *old* value, so
817            // getDifference will return exactly what we missed.
818            let c   = client.clone();
819            let utx = client.inner.update_tx.clone();
820            tokio::spawn(async move {
821                // ── 1. Global getDifference ───────────────────────────────
822                match c.get_difference().await {
823                    Ok(missed) => {
824                        log::info!("[layer] catch_up: {} global updates replayed", missed.len());
825                        for u in missed { let _ = utx.send(u); }
826                    }
827                    Err(e) => log::warn!("[layer] catch_up getDifference: {e}"),
828                }
829
830                // ── 2. Per-channel getChannelDifference ───────────────────
831                // Spawn a task per channel so they run concurrently.
832                // channel_pts were preserved through the global reset above.
833                if !channel_ids.is_empty() {
834                    log::info!("[layer] catch_up: per-channel diff for {} channels", channel_ids.len());
835                    for channel_id in channel_ids {
836                        let c2   = c.clone();
837                        let utx2 = utx.clone();
838                        tokio::spawn(async move {
839                            match c2.get_channel_difference(channel_id).await {
840                                Ok(updates) => {
841                                    if !updates.is_empty() {
842                                        log::info!("[layer] catch_up channel {channel_id}: {} updates", updates.len());
843                                    }
844                                    for u in updates { let _ = utx2.send(u); }
845                                }
846                                Err(e) => log::warn!("[layer] catch_up channel {channel_id}: {e}"),
847                            }
848                        });
849                    }
850                }
851            });
852        } else {
853            // No saved state or catch_up disabled — sync from server.
854            let _ = client.sync_pts_state().await;
855        }
856
857        Ok((client, shutdown_token))
858    }
859
860    async fn fresh_connect(
861        socks5:    Option<&crate::socks5::Socks5Config>,
862        transport: &TransportKind,
863    ) -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
864        log::info!("[layer] Fresh connect to DC2 …");
865        let conn = Connection::connect_raw("149.154.167.51:443", socks5, transport).await?;
866        let opts = session::default_dc_addresses()
867            .into_iter()
868            .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
869            .collect();
870        Ok((conn, 2, opts))
871    }
872
873    // ── Session ────────────────────────────────────────────────────────────
874
875    pub async fn save_session(&self) -> Result<(), InvocationError> {
876        use session::{CachedPeer, UpdatesStateSnap};
877
878        let writer_guard = self.inner.writer.lock().await;
879        let home_dc_id   = *self.inner.home_dc_id.lock().await;
880        let dc_options   = self.inner.dc_options.lock().await;
881
882        let mut dcs: Vec<DcEntry> = dc_options.values().map(|e| DcEntry {
883            dc_id:       e.dc_id,
884            addr:        e.addr.clone(),
885            auth_key:    if e.dc_id == home_dc_id { Some(writer_guard.auth_key_bytes()) } else { e.auth_key },
886            first_salt:  if e.dc_id == home_dc_id { writer_guard.first_salt() } else { e.first_salt },
887            time_offset: if e.dc_id == home_dc_id { writer_guard.time_offset() } else { e.time_offset },
888        }).collect();
889        self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
890
891        // Snapshot current pts state
892        let pts_snap = {
893            let s = self.inner.pts_state.lock().await;
894            UpdatesStateSnap {
895                pts:      s.pts,
896                qts:      s.qts,
897                date:     s.date,
898                seq:      s.seq,
899                channels: s.channel_pts.iter().map(|(&k, &v)| (k, v)).collect(),
900            }
901        };
902
903        // Snapshot peer access-hash cache
904        let peers: Vec<CachedPeer> = {
905            let cache = self.inner.peer_cache.read().await;
906            let mut v = Vec::with_capacity(cache.users.len() + cache.channels.len());
907            for (&id, &hash) in &cache.users    { v.push(CachedPeer { id, access_hash: hash, is_channel: false }); }
908            for (&id, &hash) in &cache.channels { v.push(CachedPeer { id, access_hash: hash, is_channel: true  }); }
909            v
910        };
911
912        self.inner.session_backend
913            .save(&PersistedSession { home_dc_id, dcs, updates_state: pts_snap, peers })
914            .map_err(InvocationError::Io)?;
915        log::info!("[layer] Session saved ✓");
916        Ok(())
917    }
918
919    /// Export the current session as a portable URL-safe base64 string.
920    ///
921    /// The returned string encodes the auth key, DC, update state, and peer
922    /// cache. Store it in an environment variable or secret manager and pass
923    /// it back via [`Config::with_string_session`] to restore the session
924    /// without re-authenticating.
925    ///
926    /// Internally calls [`save_session`] first to ensure the latest state
927    /// (pts, peer cache) is captured.
928    pub async fn export_session_string(&self) -> Result<String, InvocationError> {
929        use session::{CachedPeer, UpdatesStateSnap};
930
931        let writer_guard = self.inner.writer.lock().await;
932        let home_dc_id   = *self.inner.home_dc_id.lock().await;
933        let dc_options   = self.inner.dc_options.lock().await;
934
935        let mut dcs: Vec<DcEntry> = dc_options.values().map(|e| DcEntry {
936            dc_id:       e.dc_id,
937            addr:        e.addr.clone(),
938            auth_key:    if e.dc_id == home_dc_id { Some(writer_guard.auth_key_bytes()) } else { e.auth_key },
939            first_salt:  if e.dc_id == home_dc_id { writer_guard.first_salt() } else { e.first_salt },
940            time_offset: if e.dc_id == home_dc_id { writer_guard.time_offset() } else { e.time_offset },
941        }).collect();
942        self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
943
944        let pts_snap = {
945            let s = self.inner.pts_state.lock().await;
946            UpdatesStateSnap {
947                pts:      s.pts,
948                qts:      s.qts,
949                date:     s.date,
950                seq:      s.seq,
951                channels: s.channel_pts.iter().map(|(&k, &v)| (k, v)).collect(),
952            }
953        };
954
955        let peers: Vec<CachedPeer> = {
956            let cache = self.inner.peer_cache.read().await;
957            let mut v = Vec::with_capacity(cache.users.len() + cache.channels.len());
958            for (&id, &hash) in &cache.users    { v.push(CachedPeer { id, access_hash: hash, is_channel: false }); }
959            for (&id, &hash) in &cache.channels { v.push(CachedPeer { id, access_hash: hash, is_channel: true  }); }
960            v
961        };
962
963        let session = PersistedSession { home_dc_id, dcs, updates_state: pts_snap, peers };
964        Ok(session.to_string())
965    }
966
967
968
969    /// Returns `true` if the client is already authorized.
970    pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
971        match self.invoke(&tl::functions::updates::GetState {}).await {
972            Ok(_)  => Ok(true),
973            Err(e) if e.is("AUTH_KEY_UNREGISTERED")
974                   || matches!(&e, InvocationError::Rpc(r) if r.code == 401) => Ok(false),
975            Err(e) => Err(e),
976        }
977    }
978
979    /// Sign in as a bot.
980    pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
981        let req = tl::functions::auth::ImportBotAuthorization {
982            flags: 0, api_id: self.inner.api_id,
983            api_hash: self.inner.api_hash.clone(),
984            bot_auth_token: token.to_string(),
985        };
986
987        let result = match self.invoke(&req).await {
988            Ok(x) => x,
989            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
990                let dc_id = r.value.unwrap_or(2) as i32;
991                self.migrate_to(dc_id).await?;
992                self.invoke(&req).await?
993            }
994            Err(e) => return Err(e),
995        };
996
997        let name = match result {
998            tl::enums::auth::Authorization::Authorization(a) => {
999                self.cache_user(&a.user).await;
1000                Self::extract_user_name(&a.user)
1001            }
1002            tl::enums::auth::Authorization::SignUpRequired(_) => {
1003                panic!("unexpected SignUpRequired during bot sign-in")
1004            }
1005        };
1006        log::info!("[layer] Bot signed in ✓  ({name})");
1007        Ok(name)
1008    }
1009
1010    /// Request a login code for a user account.
1011    pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
1012        use tl::enums::auth::SentCode;
1013
1014        let req = self.make_send_code_req(phone);
1015        let body = match self.rpc_call_raw(&req).await {
1016            Ok(b) => b,
1017            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
1018                let dc_id = r.value.unwrap_or(2) as i32;
1019                self.migrate_to(dc_id).await?;
1020                self.rpc_call_raw(&req).await?
1021            }
1022            Err(e) => return Err(e),
1023        };
1024
1025        let mut cur = Cursor::from_slice(&body);
1026        let hash = match tl::enums::auth::SentCode::deserialize(&mut cur)? {
1027            SentCode::SentCode(s) => s.phone_code_hash,
1028            SentCode::Success(_)  => return Err(InvocationError::Deserialize("unexpected Success".into())),
1029            SentCode::PaymentRequired(_) => return Err(InvocationError::Deserialize("payment required to send code".into())),
1030        };
1031        log::info!("[layer] Login code sent");
1032        Ok(LoginToken { phone: phone.to_string(), phone_code_hash: hash })
1033    }
1034
1035    /// Complete sign-in with the code sent to the phone.
1036    pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
1037        let req = tl::functions::auth::SignIn {
1038            phone_number:    token.phone.clone(),
1039            phone_code_hash: token.phone_code_hash.clone(),
1040            phone_code:      Some(code.trim().to_string()),
1041            email_verification: None,
1042        };
1043
1044        let body = match self.rpc_call_raw(&req).await {
1045            Ok(b) => b,
1046            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
1047                let dc_id = r.value.unwrap_or(2) as i32;
1048                self.migrate_to(dc_id).await.map_err(SignInError::Other)?;
1049                self.rpc_call_raw(&req).await.map_err(SignInError::Other)?
1050            }
1051            Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
1052                let t = self.get_password_info().await.map_err(SignInError::Other)?;
1053                return Err(SignInError::PasswordRequired(Box::new(t)));
1054            }
1055            Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
1056            Err(e) => return Err(SignInError::Other(e)),
1057        };
1058
1059        let mut cur = Cursor::from_slice(&body);
1060        match tl::enums::auth::Authorization::deserialize(&mut cur)
1061            .map_err(|e| SignInError::Other(e.into()))?
1062        {
1063            tl::enums::auth::Authorization::Authorization(a) => {
1064                self.cache_user(&a.user).await;
1065                let name = Self::extract_user_name(&a.user);
1066                log::info!("[layer] Signed in ✓  Welcome, {name}!");
1067                Ok(name)
1068            }
1069            tl::enums::auth::Authorization::SignUpRequired(_) => Err(SignInError::SignUpRequired),
1070        }
1071    }
1072
1073    /// Complete 2FA login.
1074    pub async fn check_password(
1075        &self,
1076        token:    PasswordToken,
1077        password: impl AsRef<[u8]>,
1078    ) -> Result<String, InvocationError> {
1079        let pw   = token.password;
1080        let algo = pw.current_algo.ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
1081        let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
1082        let g_b  = pw.srp_b.ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
1083        let a    = pw.secure_random;
1084        let srp_id = pw.srp_id.ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
1085
1086        let (m1, g_a) = two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
1087        let req = tl::functions::auth::CheckPassword {
1088            password: tl::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
1089                tl::types::InputCheckPasswordSrp {
1090                    srp_id, a: g_a.to_vec(), m1: m1.to_vec(),
1091                },
1092            ),
1093        };
1094
1095        let body = self.rpc_call_raw(&req).await?;
1096        let mut cur = Cursor::from_slice(&body);
1097        match tl::enums::auth::Authorization::deserialize(&mut cur)? {
1098            tl::enums::auth::Authorization::Authorization(a) => {
1099                self.cache_user(&a.user).await;
1100                let name = Self::extract_user_name(&a.user);
1101                log::info!("[layer] 2FA ✓  Welcome, {name}!");
1102                Ok(name)
1103            }
1104            tl::enums::auth::Authorization::SignUpRequired(_) =>
1105                Err(InvocationError::Deserialize("unexpected SignUpRequired after 2FA".into())),
1106        }
1107    }
1108
1109    /// Sign out and invalidate the current session.
1110    pub async fn sign_out(&self) -> Result<bool, InvocationError> {
1111        let req = tl::functions::auth::LogOut {};
1112        match self.rpc_call_raw(&req).await {
1113            Ok(_) => { log::info!("[layer] Signed out ✓"); Ok(true) }
1114            Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
1115            Err(e) => Err(e),
1116        }
1117    }
1118
1119    // ── Get self ───────────────────────────────────────────────────────────
1120
1121    /// Fetch information about the logged-in user.
1122    pub async fn get_me(&self) -> Result<tl::types::User, InvocationError> {
1123        let req = tl::functions::users::GetUsers {
1124            id: vec![tl::enums::InputUser::UserSelf],
1125        };
1126        let body    = self.rpc_call_raw(&req).await?;
1127        let mut cur = Cursor::from_slice(&body);
1128        let users   = Vec::<tl::enums::User>::deserialize(&mut cur)?;
1129        self.cache_users_slice(&users).await;
1130        users.into_iter().find_map(|u| match u {
1131            tl::enums::User::User(u) => Some(u),
1132            _ => None,
1133        }).ok_or_else(|| InvocationError::Deserialize("getUsers returned no user".into()))
1134    }
1135
1136    // ── Updates ────────────────────────────────────────────────────────────
1137
1138    /// Return an [`UpdateStream`] that yields incoming [`Update`]s.
1139    ///
1140    /// The reader task (started inside `connect()`) sends all updates to
1141    /// `inner.update_tx`. This method proxies those updates into a fresh
1142    /// caller-owned channel — typically called once per bot/app loop.
1143    pub fn stream_updates(&self) -> UpdateStream {
1144        let (caller_tx, rx) = mpsc::unbounded_channel::<update::Update>();
1145        // The internal channel is: reader_task → update_tx → _update_rx
1146        // We forward _update_rx → caller_tx so the caller gets an owned stream.
1147        let internal_rx = self._update_rx.clone();
1148        tokio::spawn(async move {
1149            // Lock once and hold — this is the sole consumer of the internal channel.
1150            let mut guard = internal_rx.lock().await;
1151            while let Some(upd) = guard.recv().await {
1152                if caller_tx.send(upd).is_err() { break; }
1153            }
1154        });
1155        UpdateStream { rx }
1156    }
1157
1158    // ── Network hint ───────────────────────────────────────────────────────
1159
1160    /// Signal that network connectivity has been restored.
1161    ///
1162    /// Call this from platform network-change callbacks — Android's
1163    /// `ConnectivityManager`, iOS `NWPathMonitor`, or any other OS hook —
1164    /// to make the client attempt an immediate reconnect instead of waiting
1165    /// for the exponential backoff timer to expire.
1166    ///
1167    /// Safe to call at any time: if the connection is healthy the hint is
1168    /// silently ignored by the reader task; if it is in a backoff loop it
1169    /// wakes up and tries again right away.
1170    pub fn signal_network_restored(&self) {
1171        let _ = self.inner.network_hint_tx.send(());
1172    }
1173
1174    // ── Reader task ────────────────────────────────────────────────────────
1175    // Decrypts frames without holding any lock, then routes:
1176    //   rpc_result  → pending map (oneshot to waiting RPC caller)
1177    //   update      → update_tx  (delivered to stream_updates consumers)
1178    //   bad_server_salt → updates writer salt
1179    //
1180    // On error: drains pending with Io errors (so AutoSleep retries callers),
1181    // then loops with exponential backoff until reconnect succeeds.
1182    // network_hint_rx lets external callers (Android/iOS) skip the backoff.
1183    //
1184    // DC migration / reconnect: the new read half arrives via new_conn_rx.
1185    // The select! between recv_frame_owned and new_conn_rx.recv() ensures we
1186    // switch to the new connection immediately, without waiting for the next
1187    // frame on the old (now stale) connection.
1188
1189    // ── Reader task supervisor ────────────────────────────────────────────────
1190    //
1191    // run_reader_task is the outer supervisor. It wraps reader_loop in a
1192    // restart loop so that if reader_loop ever exits for any reason other than
1193    // a clean shutdown request, it is automatically reconnected and restarted.
1194    //
1195    // This mirrors what Telegram Desktop does: the network thread is considered
1196    // infrastructure and is never allowed to die permanently.
1197    //
1198    // Restart sequence on unexpected exit:
1199    //   1. Drain all pending RPCs with ConnectionReset so callers unblock.
1200    //   2. Exponential-backoff reconnect loop (500 ms → 30 s cap) until TCP
1201    //      succeeds, respecting the shutdown token at every sleep point.
1202    //   3. Spawn init_connection in a background task (same deadlock-safe
1203    //      pattern as do_reconnect_loop) and pass the oneshot receiver as the
1204    //      initial_init_rx to the restarted reader_loop.
1205    //   4. reader_loop picks up init_rx immediately on its first iteration and
1206    //      handles success/failure exactly like a mid-session reconnect.
1207    #[allow(clippy::too_many_arguments)]
1208    async fn run_reader_task(
1209        &self,
1210        read_half:   OwnedReadHalf,
1211        frame_kind:  FrameKind,
1212        auth_key:    [u8; 256],
1213        session_id:  i64,
1214        mut new_conn_rx:      mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1215        mut network_hint_rx:  mpsc::UnboundedReceiver<()>,
1216        shutdown_token:       CancellationToken,
1217    ) {
1218        let mut rh  = read_half;
1219        let mut fk  = frame_kind;
1220        let mut ak  = auth_key;
1221        let mut sid = session_id;
1222        // On first start no init is needed (connect() already called it).
1223        // On restarts we pass the spawned init task so reader_loop handles it.
1224        let mut restart_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = None;
1225        let mut restart_count: u32 = 0;
1226
1227        loop {
1228            tokio::select! {
1229                // ── Clean shutdown ────────────────────────────────────────────
1230                _ = shutdown_token.cancelled() => {
1231                    log::info!("[layer] Reader task: shutdown requested, exiting cleanly.");
1232                    let mut pending = self.inner.pending.lock().await;
1233                    for (_, tx) in pending.drain() {
1234                        let _ = tx.send(Err(InvocationError::Dropped));
1235                    }
1236                    return;
1237                }
1238
1239                // ── reader_loop ───────────────────────────────────────────────
1240                _ = self.reader_loop(
1241                        rh, fk, ak, sid,
1242                        restart_init_rx.take(),
1243                        &mut new_conn_rx, &mut network_hint_rx,
1244                    ) => {}
1245            }
1246
1247            // If we reach here, reader_loop returned without a shutdown signal.
1248            // This should never happen in normal operation — treat it as a fault.
1249            if shutdown_token.is_cancelled() {
1250                log::info!("[layer] Reader task: exiting after loop (shutdown).");
1251                return;
1252            }
1253
1254            restart_count += 1;
1255            log::error!(
1256                "[layer] Reader loop exited unexpectedly (restart #{restart_count}) —                  supervisor reconnecting …"
1257            );
1258
1259            // Step 1: drain all pending RPCs so callers don't hang.
1260            {
1261                let mut pending = self.inner.pending.lock().await;
1262                for (_, tx) in pending.drain() {
1263                    let _ = tx.send(Err(InvocationError::Io(std::io::Error::new(
1264                        std::io::ErrorKind::ConnectionReset,
1265                        "reader task restarted",
1266                    ))));
1267                }
1268            }
1269            // Fix #9: drain sent_bodies alongside pending to prevent unbounded growth.
1270            self.inner.writer.lock().await.sent_bodies.clear();
1271
1272            // Step 2: reconnect with exponential backoff, honouring shutdown.
1273            let mut delay_ms = RECONNECT_BASE_MS;
1274            let new_conn = loop {
1275                log::info!("[layer] Supervisor: reconnecting in {delay_ms} ms …");
1276                tokio::select! {
1277                    _ = shutdown_token.cancelled() => {
1278                        log::info!("[layer] Supervisor: shutdown during reconnect, exiting.");
1279                        return;
1280                    }
1281                    _ = sleep(Duration::from_millis(delay_ms)) => {}
1282                }
1283
1284                // do_reconnect ignores both params (_old_auth_key, _old_frame_kind) —
1285                // it re-reads everything from ClientInner. rh/fk/ak/sid were moved
1286                // into reader_loop, so we pass dummies here; fresh values come back
1287                // from the Ok result and replace them below.
1288                let dummy_ak = [0u8; 256];
1289                let dummy_fk = FrameKind::Abridged;
1290                match self.do_reconnect(&dummy_ak, &dummy_fk).await {
1291                    Ok(conn) => break conn,
1292                    Err(e)   => {
1293                        log::warn!("[layer] Supervisor: reconnect failed ({e})");
1294                        let next = (delay_ms * 2).min(RECONNECT_MAX_SECS * 1_000);
1295                        delay_ms = jitter_delay(next).as_millis() as u64;
1296                    }
1297                }
1298            };
1299
1300            let (new_rh, new_fk, new_ak, new_sid) = new_conn;
1301            rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
1302
1303            // Step 3: spawn init_connection (cannot await inline — reader must
1304            // be running to route the RPC response, or we deadlock).
1305            let (init_tx, init_rx) = oneshot::channel();
1306            let c   = self.clone();
1307            let utx = self.inner.update_tx.clone();
1308            tokio::spawn(async move {
1309                // Respect FLOOD_WAIT (same as do_reconnect_loop).
1310                let result = loop {
1311                    match c.init_connection().await {
1312                        Ok(()) => break Ok(()),
1313                        Err(InvocationError::Rpc(ref r))
1314                            if r.flood_wait_seconds().is_some() =>
1315                        {
1316                            let secs = r.flood_wait_seconds().unwrap();
1317                            log::warn!(
1318                                "[layer] Supervisor init_connection FLOOD_WAIT_{secs} — waiting"
1319                            );
1320                            sleep(Duration::from_secs(secs + 1)).await;
1321                        }
1322                        Err(e) => break Err(e),
1323                    }
1324                };
1325                if result.is_ok()
1326                    && let Ok(missed) = c.get_difference().await {
1327                    for u in missed { let _ = utx.send(u); }
1328                }
1329                let _ = init_tx.send(result);
1330            });
1331            restart_init_rx = Some(init_rx);
1332
1333            log::info!("[layer] Supervisor: restarting reader loop (restart #{restart_count}) …");
1334            // Loop back → reader_loop restarts with the fresh connection.
1335        }
1336    }
1337
1338    #[allow(clippy::too_many_arguments)]
1339    async fn reader_loop(
1340        &self,
1341        mut rh:          OwnedReadHalf,
1342        mut fk:          FrameKind,
1343        mut ak:          [u8; 256],
1344        mut sid:         i64,
1345        // When Some, the supervisor has already spawned init_connection on our
1346        // behalf (supervisor restart path). On first start this is None.
1347        initial_init_rx:  Option<oneshot::Receiver<Result<(), InvocationError>>>,
1348        new_conn_rx:      &mut mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1349        network_hint_rx:  &mut mpsc::UnboundedReceiver<()>,
1350    ) {
1351        // Tracks an in-flight init_connection task spawned after every reconnect.
1352        // The reader loop must keep routing frames while we wait so the RPC
1353        // response can reach its oneshot sender (otherwise → 30 s self-deadlock).
1354        // If init fails we re-enter the reconnect loop immediately.
1355        let mut init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = initial_init_rx;
1356        // How many consecutive init_connection failures have occurred on the
1357        // *current* auth key.  We retry with the same key up to 2 times before
1358        // assuming the key is stale and clearing it for a fresh DH handshake.
1359        // This prevents a transient 30 s timeout from nuking a valid session.
1360        let mut init_fail_count: u32 = 0;
1361
1362        loop {
1363            tokio::select! {
1364                // ── Normal frame (or application-level keepalive timeout) ─────
1365                outcome = recv_frame_with_keepalive(&mut rh, &fk, self, &ak) => {
1366                    match outcome {
1367                        FrameOutcome::Frame(mut raw) => {
1368                            let msg = match EncryptedSession::decrypt_frame(&ak, sid, &mut raw) {
1369                                Ok(m)  => m,
1370                                Err(e) => { log::warn!("[layer] Decrypt error: {e:?}"); continue; }
1371                            };
1372                            if msg.salt != 0 {
1373                                self.inner.writer.lock().await.enc.salt = msg.salt;
1374                            }
1375                            self.route_frame(msg.body, msg.msg_id).await;
1376
1377                            // G-04/G-07: Acks are NOT flushed here standalone.
1378                            // They accumulate in pending_ack and are bundled into the next
1379                            // outgoing request container, matching grammers' behavior and
1380                            // avoiding an extra standalone frame (and extra RTT exposure).
1381                        }
1382
1383                        FrameOutcome::Error(e) => {
1384                            log::warn!("[layer] Reader: connection error: {e}");
1385                            drop(init_rx.take()); // discard any in-flight init
1386
1387                            // Fail all in-flight RPCs immediately so AutoSleep
1388                            // retries them as soon as we reconnect.
1389                            {
1390                                let mut pending = self.inner.pending.lock().await;
1391                                let msg = e.to_string();
1392                                for (_, tx) in pending.drain() {
1393                                    let _ = tx.send(Err(InvocationError::Io(
1394                                        std::io::Error::new(
1395                                            std::io::ErrorKind::ConnectionReset, msg.clone()))));
1396                                }
1397                            }
1398                            // Fix #9: drain sent_bodies so it doesn't grow unbounded under loss.
1399                            self.inner.writer.lock().await.sent_bodies.clear();
1400
1401                            match self.do_reconnect_loop(
1402                                RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
1403                                network_hint_rx,
1404                            ).await {
1405                                Some(rx) => { init_rx = Some(rx); }
1406                                None     => return, // shutdown requested
1407                            }
1408                        }
1409
1410                        FrameOutcome::Keepalive => {} // ping sent successfully; loop
1411                    }
1412                }
1413
1414                // ── DC migration / deliberate reconnect ──────────────────────
1415                maybe = new_conn_rx.recv() => {
1416                    if let Some((new_rh, new_fk, new_ak, new_sid)) = maybe {
1417                        rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
1418                        log::info!("[layer] Reader: switched to new connection.");
1419                    } else {
1420                        break; // reconnect_tx dropped → client is shutting down
1421                    }
1422                }
1423
1424
1425                // ── init_connection result (polled only when Some) ────────────
1426                init_result = async { init_rx.as_mut().unwrap().await }, if init_rx.is_some() => {
1427                    init_rx = None;
1428                    match init_result {
1429                        Ok(Ok(())) => {
1430                            init_fail_count = 0;
1431                            // Fully initialised. Persist updated DC/session state.
1432                            // Retry up to 3 times — a transient I/O error (e.g. the
1433                            // filesystem briefly unavailable on Android) should not
1434                            // cause the auth key to diverge from disk permanently.
1435                            log::info!("[layer] Reconnected to Telegram ✓ — session live, replaying missed updates …");
1436                            for attempt in 1u8..=3 {
1437                                match self.save_session().await {
1438                                    Ok(()) => break,
1439                                    Err(e) if attempt < 3 => {
1440                                        log::warn!(
1441                                            "[layer] save_session failed (attempt {attempt}/3): {e}"
1442                                        );
1443                                        sleep(Duration::from_millis(500)).await;
1444                                    }
1445                                    Err(e) => {
1446                                        log::error!(
1447                                            "[layer] save_session permanently failed after 3 attempts: {e}"
1448                                        );
1449                                    }
1450                                }
1451                            }
1452                        }
1453
1454                        Ok(Err(e)) => {
1455                            // TCP connected but init RPC failed.
1456                            //
1457                            // Grammers' approach: only treat the key as stale when
1458                            // Telegram sends a definitive "bad auth key" signal —
1459                            // a -404 transport error code.  A 30 s timeout is almost
1460                            // always a transient network issue and must NOT clear the
1461                            // session (doing so creates a brand-new session in Telegram's
1462                            // active devices list and orphans the old one).
1463                            let key_is_stale = match &e {
1464                                // Telegram's abridged-transport error code for invalid key
1465                                InvocationError::Rpc(r) if r.code == -404 => true,
1466                                // Early EOF immediately after connecting = server rejected key
1467                                InvocationError::Io(io) if io.kind() == std::io::ErrorKind::UnexpectedEof
1468                                    || io.kind() == std::io::ErrorKind::ConnectionReset => true,
1469                                // 30 s timeout → transient; keep the key and retry
1470                                _ => false,
1471                            };
1472
1473                            if key_is_stale {
1474                                log::warn!(
1475                                    "[layer] init_connection failed with definitive bad-key signal ({e}) \
1476                                     — clearing auth key for fresh DH …"
1477                                );
1478                                init_fail_count = 0;
1479                                let home_dc_id = *self.inner.home_dc_id.lock().await;
1480                                let mut opts = self.inner.dc_options.lock().await;
1481                                if let Some(entry) = opts.get_mut(&home_dc_id) {
1482                                    entry.auth_key = None;
1483                                }
1484                            } else {
1485                                init_fail_count += 1;
1486                                log::warn!(
1487                                    "[layer] init_connection failed transiently (attempt {init_fail_count}, {e}) \
1488                                     — retrying with same key …"
1489                                );
1490                            }
1491                            {
1492                                let mut pending = self.inner.pending.lock().await;
1493                                let msg = e.to_string();
1494                                for (_, tx) in pending.drain() {
1495                                    let _ = tx.send(Err(InvocationError::Io(
1496                                        std::io::Error::new(
1497                                            std::io::ErrorKind::ConnectionReset, msg.clone()))));
1498                                }
1499                            }
1500                            match self.do_reconnect_loop(
1501                                0, &mut rh, &mut fk, &mut ak, &mut sid, network_hint_rx,
1502                            ).await {
1503                                Some(rx) => { init_rx = Some(rx); }
1504                                None     => return,
1505                            }
1506                        }
1507
1508                        Err(_) => {
1509                            // init task was dropped (shouldn't normally happen).
1510                            log::warn!("[layer] init_connection task dropped unexpectedly, reconnecting …");
1511                            match self.do_reconnect_loop(
1512                                RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
1513                                network_hint_rx,
1514                            ).await {
1515                                Some(rx) => { init_rx = Some(rx); }
1516                                None     => return,
1517                            }
1518                        }
1519                    }
1520                }
1521            }
1522        }
1523    }
1524
1525    /// Route a decrypted MTProto frame body to either a pending RPC caller or update_tx.
1526    async fn route_frame(&self, body: Vec<u8>, msg_id: i64) {
1527        if body.len() < 4 { return; }
1528        let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
1529
1530        match cid {
1531            ID_RPC_RESULT => {
1532                if body.len() < 12 { return; }
1533                let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1534                let inner = body[12..].to_vec();
1535                // G-04: ack the rpc_result container message
1536                self.inner.writer.lock().await.pending_ack.push(msg_id);
1537                let result = unwrap_envelope(inner);
1538                if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
1539                    // G-02: request resolved — remove from sent_bodies
1540                    self.inner.writer.lock().await.sent_bodies.remove(&req_msg_id);
1541                    let to_send = match result {
1542                        Ok(EnvelopeResult::Payload(p)) => Ok(p),
1543                        Ok(EnvelopeResult::Updates(us)) => {
1544                            for u in us { let _ = self.inner.update_tx.send(u); }
1545                            Ok(vec![])
1546                        }
1547                        Ok(EnvelopeResult::None) => Ok(vec![]),
1548                        Err(e) => Err(e),
1549                    };
1550                    let _ = tx.send(to_send);
1551                }
1552            }
1553            ID_RPC_ERROR => {
1554                log::warn!("[layer] Unexpected top-level rpc_error (no pending target)");
1555            }
1556            ID_MSG_CONTAINER => {
1557                if body.len() < 8 { return; }
1558                let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
1559                let mut pos = 8usize;
1560                for _ in 0..count {
1561                    if pos + 16 > body.len() { break; }
1562                    // Extract inner msg_id for correct ack tracking
1563                    let inner_msg_id = i64::from_le_bytes(body[pos..pos + 8].try_into().unwrap());
1564                    let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
1565                    pos += 16;
1566                    if pos + inner_len > body.len() { break; }
1567                    let inner = body[pos..pos + inner_len].to_vec();
1568                    pos += inner_len;
1569                    Box::pin(self.route_frame(inner, inner_msg_id)).await;
1570                }
1571            }
1572            ID_GZIP_PACKED => {
1573                let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
1574                if let Ok(inflated) = gz_inflate(&bytes) {
1575                    // pass same outer msg_id — gzip has no msg_id of its own
1576                    Box::pin(self.route_frame(inflated, msg_id)).await;
1577                }
1578            }
1579            ID_BAD_SERVER_SALT => {
1580                // bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int new_server_salt:long
1581                // body[0..4]   = constructor
1582                // body[4..12]  = bad_msg_id
1583                // body[12..16] = bad_msg_seqno
1584                // body[16..24] = new_server_salt  ← was wrongly [8..16]
1585                if body.len() >= 24 {
1586                    let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1587                    let new_salt   = i64::from_le_bytes(body[16..24].try_into().unwrap());
1588
1589                    // Update session salt so re-sent request uses the correct salt.
1590                    self.inner.writer.lock().await.enc.salt = new_salt;
1591                    log::debug!("[layer] bad_server_salt: bad_msg_id={bad_msg_id} salt={new_salt:#x}");
1592
1593                    // Re-transmit the original request under the new salt.
1594                    {
1595                        let mut w = self.inner.writer.lock().await;
1596                        if let Some(orig_body) = w.sent_bodies.remove(&bad_msg_id) {
1597                            let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
1598                            let fk = w.frame_kind.clone();
1599                            w.sent_bodies.insert(new_msg_id, orig_body);
1600                            // Drop writer before touching pending (lock-order safety).
1601                            drop(w);
1602                            let mut pending = self.inner.pending.lock().await;
1603                            if let Some(tx) = pending.remove(&bad_msg_id) {
1604                                pending.insert(new_msg_id, tx);
1605                                drop(pending);
1606                                let mut w = self.inner.writer.lock().await;
1607                                if let Err(e) = send_frame_write(&mut w.write_half, &wire, &fk).await {
1608                                    log::warn!("[layer] bad_server_salt re-send failed: {e}");
1609                                } else {
1610                                    log::debug!("[layer] bad_server_salt re-sent {bad_msg_id}→{new_msg_id}");
1611                                }
1612                            }
1613                        }
1614                    }
1615
1616                    // G-08: proactively refresh salt pool in background.
1617                    let inner = Arc::clone(&self.inner);
1618                    tokio::spawn(async move {
1619                        log::debug!("[layer] G-08 proactive GetFutureSalts …");
1620                        // get_future_salts#b921bd04 num:int = FutureSalts
1621                        let mut req_body = Vec::with_capacity(8);
1622                        req_body.extend_from_slice(&0xb921bd04_u32.to_le_bytes());
1623                        req_body.extend_from_slice(&64_i32.to_le_bytes());
1624                        // G-09 fix: store in sent_bodies so bad_msg_notify can re-send it.
1625                        let (wire, fs_msg_id) = {
1626                            let mut w = inner.writer.lock().await;
1627                            let (wire, id) = w.enc.pack_body_with_msg_id(&req_body, true);
1628                            w.sent_bodies.insert(id, req_body);
1629                            (wire, id)
1630                        };
1631                        let fk = inner.writer.lock().await.frame_kind.clone();
1632                        let (tx, rx) = tokio::sync::oneshot::channel();
1633                        inner.pending.lock().await.insert(fs_msg_id, tx);
1634                        {
1635                            let mut w = inner.writer.lock().await;
1636                            if send_frame_write(&mut w.write_half, &wire, &fk).await.is_err() {
1637                                inner.pending.lock().await.remove(&fs_msg_id);
1638                                inner.writer.lock().await.sent_bodies.remove(&fs_msg_id);
1639                                return;
1640                            }
1641                        }
1642                        let _ = tokio::time::timeout(
1643                            std::time::Duration::from_secs(30), rx
1644                        ).await;
1645                    });
1646                }
1647            }
1648            ID_PONG => {
1649                // Pong is the server's reply to Ping — NOT inside rpc_result.
1650                // pong#347773c5  msg_id:long  ping_id:long
1651                // body[4..12] = msg_id of the original Ping → key in pending map
1652                if body.len() >= 20 {
1653                    let ping_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1654                    if let Some(tx) = self.inner.pending.lock().await.remove(&ping_msg_id) {
1655                        self.inner.writer.lock().await.sent_bodies.remove(&ping_msg_id);
1656                        let _ = tx.send(Ok(body));
1657                    }
1658                }
1659            }
1660            // ── G-09: FutureSalts arrives bare (like Pong) ───────────────────
1661            ID_FUTURE_SALTS => {
1662                // future_salts#ae500895
1663                //   [0..4]   constructor
1664                //   [4..12]  req_msg_id (long)
1665                //   [12..16] now (int)
1666                //   [16..20] vector constructor 0x1cb5c415
1667                //   [20..24] count (int)          ← was wrongly [16..20]
1668                //   per entry (bare, no constructor):
1669                //     [+0..+4]  valid_since (int)
1670                //     [+4..+8]  valid_until (int)
1671                //     [+8..+16] salt (long)
1672                //   first entry starts at byte 24 → salt at [32..40] ← was [28..36]
1673                if body.len() >= 12 {
1674                    let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1675                    if body.len() >= 40 {
1676                        let count = u32::from_le_bytes(body[20..24].try_into().unwrap()) as usize;
1677                        if count > 0 {
1678                            let salt_val = i64::from_le_bytes(body[32..40].try_into().unwrap());
1679                            self.inner.writer.lock().await.enc.salt = salt_val;
1680                            log::debug!("[layer] G-09 FutureSalts: salt={salt_val:#x} count={count}");
1681                        }
1682                    }
1683                    if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
1684                        self.inner.writer.lock().await.sent_bodies.remove(&req_msg_id);
1685                        let _ = tx.send(Ok(body));
1686                    }
1687                }
1688            }
1689            ID_NEW_SESSION => {
1690                // new_session_created#9ec20908 first_msg_id:long unique_id:long server_salt:long
1691                if body.len() >= 28 {
1692                    let server_salt = i64::from_le_bytes(body[20..28].try_into().unwrap());
1693                    self.inner.writer.lock().await.enc.salt = server_salt;
1694                    log::debug!("[layer] new_session_created — salt reset to {server_salt:#x}");
1695                }
1696            }
1697            // ── G-02 + G-03 + G-12: bad_msg_notification ────────────────────
1698            ID_BAD_MSG_NOTIFY => {
1699                // bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int error_code:int
1700                if body.len() < 20 { return; }
1701                let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1702                let error_code = u32::from_le_bytes(body[16..20].try_into().unwrap());
1703                log::warn!("[layer] bad_msg_notification: msg_id={bad_msg_id} code={error_code}");
1704
1705                // Phase 1: hold writer only for enc-state mutations + packing.
1706                // The lock is dropped BEFORE we touch `pending`, eliminating the
1707                // writer→pending lock-order deadlock that existed before this fix.
1708                let resend: Option<(Vec<u8>, i64, FrameKind)> = {
1709                    let mut w = self.inner.writer.lock().await;
1710                    // G-12: correct clock skew on codes 16/17
1711                    if error_code == 16 || error_code == 17 {
1712                        w.enc.correct_time_offset(bad_msg_id);
1713                    }
1714                    // G-03: correct seq_no on codes 32/33
1715                    if error_code == 32 || error_code == 33 {
1716                        w.enc.correct_seq_no(error_code);
1717                    }
1718                    // G-02: pack re-send if we have the original body.
1719                    if let Some(orig_body) = w.sent_bodies.remove(&bad_msg_id) {
1720                        let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
1721                        let fk = w.frame_kind.clone();
1722                        w.sent_bodies.insert(new_msg_id, orig_body);
1723                        Some((wire, new_msg_id, fk))
1724                    } else {
1725                        None
1726                    }
1727                }; // ← writer lock released here
1728
1729                match resend {
1730                    Some((wire, new_msg_id, fk)) => {
1731                        // Phase 2: re-key pending (no writer lock held).
1732                        let has_waiter = {
1733                            let mut pending = self.inner.pending.lock().await;
1734                            if let Some(tx) = pending.remove(&bad_msg_id) {
1735                                pending.insert(new_msg_id, tx);
1736                                true
1737                            } else {
1738                                false
1739                            }
1740                        };
1741                        if has_waiter {
1742                            // Phase 3: re-acquire writer only for TCP send.
1743                            let mut w = self.inner.writer.lock().await;
1744                            if let Err(e) = send_frame_write(&mut w.write_half, &wire, &fk).await {
1745                                log::warn!("[layer] G-02 re-send failed: {e}");
1746                                w.sent_bodies.remove(&new_msg_id);
1747                            } else {
1748                                log::debug!("[layer] G-02 re-sent {bad_msg_id}→{new_msg_id}");
1749                            }
1750                        } else {
1751                            self.inner.writer.lock().await.sent_bodies.remove(&new_msg_id);
1752                        }
1753                    }
1754                    None => {
1755                        // Not in sent_bodies — surface retriable error to the waiter.
1756                        if let Some(tx) = self.inner.pending.lock().await.remove(&bad_msg_id) {
1757                            let _ = tx.send(Err(InvocationError::Deserialize(
1758                                format!("bad_msg_notification code={error_code}")
1759                            )));
1760                        }
1761                    }
1762                }
1763            }
1764            // ── G-11: MsgDetailedInfo → ack the answer_msg_id ───────────────
1765            ID_MSG_DETAILED_INFO => {
1766                // msg_detailed_info#276d3ec6 msg_id:long answer_msg_id:long bytes:int status:int
1767                // body[4..12]  = msg_id (original request)
1768                // body[12..20] = answer_msg_id (what to ack)
1769                if body.len() >= 20 {
1770                    let answer_msg_id = i64::from_le_bytes(body[12..20].try_into().unwrap());
1771                    self.inner.writer.lock().await.pending_ack.push(answer_msg_id);
1772                    log::trace!("[layer] G-11 MsgDetailedInfo: queued ack for answer_msg_id={answer_msg_id}");
1773                }
1774            }
1775            ID_MSG_NEW_DETAIL_INFO => {
1776                // msg_new_detailed_info#809db6df answer_msg_id:long bytes:int status:int
1777                // body[4..12] = answer_msg_id
1778                if body.len() >= 12 {
1779                    let answer_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1780                    self.inner.writer.lock().await.pending_ack.push(answer_msg_id);
1781                    log::trace!("[layer] G-11 MsgNewDetailedInfo: queued ack for {answer_msg_id}");
1782                }
1783            }
1784            // ── G-14: MsgResendReq → re-send the requested msg_ids ──────────
1785            ID_MSG_RESEND_REQ => {
1786                // msg_resend_req#7d861a08 msg_ids:Vector<long>
1787                // body[4..8]   = 0x1cb5c415 (Vector constructor)
1788                // body[8..12]  = count
1789                // body[12..]   = msg_ids
1790                if body.len() >= 12 {
1791                    let count = u32::from_le_bytes(body[8..12].try_into().unwrap()) as usize;
1792                    let mut w = self.inner.writer.lock().await;
1793                    let fk = w.frame_kind.clone();
1794                    for i in 0..count {
1795                        let off = 12 + i * 8;
1796                        if off + 8 > body.len() { break; }
1797                        let resend_id = i64::from_le_bytes(body[off..off + 8].try_into().unwrap());
1798                        if let Some(orig_body) = w.sent_bodies.remove(&resend_id) {
1799                            let (wire, new_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
1800                            // Re-key the pending waiter
1801                            let mut pending = self.inner.pending.lock().await;
1802                            if let Some(tx) = pending.remove(&resend_id) {
1803                                pending.insert(new_id, tx);
1804                            }
1805                            drop(pending);
1806                            w.sent_bodies.insert(new_id, orig_body);
1807                            send_frame_write(&mut w.write_half, &wire, &fk).await.ok();
1808                            log::debug!("[layer] G-14 MsgResendReq: resent {resend_id} → {new_id}");
1809                        }
1810                    }
1811                }
1812            }
1813            // ── G-13: log DestroySession outcomes ───────────────────────────
1814            0xe22045fc => {
1815                log::warn!("[layer] destroy_session_ok received — session terminated by server");
1816            }
1817            0x62d350c9 => {
1818                log::warn!("[layer] destroy_session_none received — session was already gone");
1819            }
1820            ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
1821            | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
1822            | ID_UPDATES_TOO_LONG => {
1823                // G-04: ack update frames too
1824                self.inner.writer.lock().await.pending_ack.push(msg_id);
1825                // Bug #1 fix: route through pts/qts/seq gap-checkers
1826                self.dispatch_updates(&body).await;
1827            }
1828            _ => {}
1829        }
1830    }
1831
1832
1833    // ── Bug #1: pts-aware update dispatch ────────────────────────────────────
1834
1835    /// Parse an incoming update container and route each update through the
1836    /// pts/qts/seq gap-checkers before forwarding to `update_tx`.
1837    async fn dispatch_updates(&self, body: &[u8]) {
1838        if body.len() < 4 { return; }
1839        let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
1840
1841        // updatesTooLong — we must call getDifference to recover missed updates.
1842        if cid == 0xe317af7e_u32 {
1843            log::warn!("[layer] updatesTooLong — getDifference");
1844            let c   = self.clone();
1845            let utx = self.inner.update_tx.clone();
1846            tokio::spawn(async move {
1847                match c.get_difference().await {
1848                    Ok(updates) => { for u in updates { let _ = utx.send(u); } }
1849                    Err(e) => log::warn!("[layer] getDifference after updatesTooLong: {e}"),
1850                }
1851            });
1852            return;
1853        }
1854
1855        // For updateShortMessage / updateShortChatMessage we use the existing
1856        // high-level parser (they have pts but no bare tl::enums::Update wrapper).
1857        if cid == 0x313bc7f8 || cid == 0x4d6deea5 {
1858            // Parse pts out of the short-message header.
1859            // updateShortMessage: flags(4) id(4) user_id(8) ... pts(?) pts_count(?)
1860            // Easier: use parse_updates which already handles them, then gap-check.
1861            for u in update::parse_updates(body) {
1862                let _ = self.inner.update_tx.send(u);
1863            }
1864            return;
1865        }
1866
1867        // Check outer seq for Updates / UpdatesCombined containers.
1868        {
1869            use layer_tl_types::{Cursor, Deserializable};
1870            let mut cur = Cursor::from_slice(body);
1871            let seq_info: Option<(i32, i32)> = match cid {
1872                0x74ae4240 => {
1873                    // updates#74ae4240
1874                    match tl::enums::Updates::deserialize(&mut cur) {
1875                        Ok(tl::enums::Updates::Updates(u)) => Some((u.seq, u.seq)),
1876                        _ => None,
1877                    }
1878                }
1879                0x725b04c3 => {
1880                    // updatesCombined#725b04c3
1881                    match tl::enums::Updates::deserialize(&mut cur) {
1882                        Ok(tl::enums::Updates::Combined(u)) => Some((u.seq, u.seq_start)),
1883                        _ => None,
1884                    }
1885                }
1886                _ => None,
1887            };
1888            if let Some((seq, seq_start)) = seq_info
1889                && seq != 0 {
1890                let c   = self.clone();
1891                let utx = self.inner.update_tx.clone();
1892                tokio::spawn(async move {
1893                    match c.check_and_fill_seq_gap(seq, seq_start).await {
1894                        Ok(extra) => { for u in extra { let _ = utx.send(u); } }
1895                        Err(e)    => log::warn!("[layer] seq gap fill: {e}"),
1896                    }
1897                });
1898            }
1899        }
1900
1901        // Flatten the container into bare tl::enums::Update items.
1902        use layer_tl_types::{Cursor, Deserializable};
1903        let mut cur = Cursor::from_slice(body);
1904        let raw: Vec<tl::enums::Update> = match cid {
1905            0x78d4dec1 => { // updateShort
1906                match tl::types::UpdateShort::deserialize(&mut cur) {
1907                    Ok(u) => vec![u.update],
1908                    Err(_) => vec![],
1909                }
1910            }
1911            0x74ae4240 => { // updates
1912                match tl::enums::Updates::deserialize(&mut cur) {
1913                    Ok(tl::enums::Updates::Updates(u)) => u.updates,
1914                    _ => vec![],
1915                }
1916            }
1917            0x725b04c3 => { // updatesCombined
1918                match tl::enums::Updates::deserialize(&mut cur) {
1919                    Ok(tl::enums::Updates::Combined(u)) => u.updates,
1920                    _ => vec![],
1921                }
1922            }
1923            _ => vec![],
1924        };
1925
1926        for upd in raw {
1927            self.dispatch_single_update(upd).await;
1928        }
1929    }
1930
1931    /// Route one bare `tl::enums::Update` through the pts/qts gap-checker,
1932    /// then emit surviving updates to `update_tx`.
1933    async fn dispatch_single_update(&self, upd: tl::enums::Update) {
1934        // Two-phase: inspect pts fields via reference first (all Copy), then
1935        // convert to high-level Update (consumes upd). Avoids borrow-then-move.
1936        enum Kind {
1937            GlobalPts  { pts: i32, pts_count: i32, carry: bool },
1938            ChannelPts { channel_id: i64, pts: i32, pts_count: i32, carry: bool },
1939            Qts        { qts: i32 },
1940            Passthrough,
1941        }
1942
1943        fn ch_from_msg(msg: &tl::enums::Message) -> i64 {
1944            if let tl::enums::Message::Message(m) = msg
1945                && let tl::enums::Peer::Channel(c) = &m.peer_id { return c.channel_id; }
1946            0
1947        }
1948
1949        let kind = {
1950            use tl::enums::Update::*;
1951            match &upd {
1952                NewMessage(u)            => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: true },
1953                EditMessage(u)           => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: true },
1954                DeleteMessages(u)        => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: true },
1955                ReadHistoryInbox(u)      => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: false },
1956                ReadHistoryOutbox(u)     => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: false },
1957                NewChannelMessage(u)     => Kind::ChannelPts { channel_id: ch_from_msg(&u.message), pts: u.pts, pts_count: u.pts_count, carry: true },
1958                EditChannelMessage(u)    => Kind::ChannelPts { channel_id: ch_from_msg(&u.message), pts: u.pts, pts_count: u.pts_count, carry: true },
1959                DeleteChannelMessages(u) => Kind::ChannelPts { channel_id: u.channel_id, pts: u.pts, pts_count: u.pts_count, carry: true },
1960                NewEncryptedMessage(u)   => Kind::Qts { qts: u.qts },
1961                _                        => Kind::Passthrough,
1962            }
1963        };
1964
1965        let high = update::from_single_update_pub(upd);
1966
1967        let to_send: Vec<update::Update> = match kind {
1968            Kind::GlobalPts { pts, pts_count, carry } => {
1969                let first = if carry { high.into_iter().next() } else { None };
1970                // DEADLOCK FIX: never await an RPC inside the reader task.
1971                // Spawn gap-fill as a separate task; it can receive the RPC
1972                // response because the reader loop continues running.
1973                let c   = self.clone();
1974                let utx = self.inner.update_tx.clone();
1975                tokio::spawn(async move {
1976                    match c.check_and_fill_gap(pts, pts_count, first).await {
1977                        Ok(v)  => { for u in v { let _ = utx.send(u); } }
1978                        Err(e) => log::warn!("[layer] pts gap: {e}"),
1979                    }
1980                });
1981                vec![]
1982            }
1983            Kind::ChannelPts { channel_id, pts, pts_count, carry } => {
1984                let first = if carry { high.into_iter().next() } else { None };
1985                if channel_id != 0 {
1986                    // DEADLOCK FIX: spawn; same reasoning as GlobalPts above.
1987                    let c   = self.clone();
1988                    let utx = self.inner.update_tx.clone();
1989                    tokio::spawn(async move {
1990                        match c.check_and_fill_channel_gap(channel_id, pts, pts_count, first).await {
1991                            Ok(v)  => { for u in v { let _ = utx.send(u); } }
1992                            Err(e) => log::warn!("[layer] ch pts gap: {e}"),
1993                        }
1994                    });
1995                    vec![]
1996                } else {
1997                    first.into_iter().collect()
1998                }
1999            }
2000            Kind::Qts { qts } => {
2001                // DEADLOCK FIX: spawn; same reasoning as above.
2002                let c = self.clone();
2003                tokio::spawn(async move {
2004                    if let Err(e) = c.check_and_fill_qts_gap(qts, 1).await {
2005                        log::warn!("[layer] qts gap: {e}");
2006                    }
2007                });
2008                vec![]
2009            }
2010            Kind::Passthrough => high,
2011        };
2012
2013        for u in to_send {
2014            let _ = self.inner.update_tx.send(u);
2015        }
2016    }
2017
2018    /// Loops with exponential backoff until a TCP+DH reconnect succeeds, then
2019    /// spawns `init_connection` in a background task and returns a oneshot
2020    /// receiver for its result.
2021    ///
2022    /// - `initial_delay_ms = RECONNECT_BASE_MS` for a fresh disconnect.
2023    /// - `initial_delay_ms = 0` when TCP already worked but init failed — we
2024    ///   want to retry init immediately rather than waiting another full backoff.
2025    ///
2026    /// Returns `None` if the shutdown token fires (caller should exit).
2027    async fn do_reconnect_loop(
2028        &self,
2029        initial_delay_ms: u64,
2030        rh:  &mut OwnedReadHalf,
2031        fk:  &mut FrameKind,
2032        ak:  &mut [u8; 256],
2033        sid: &mut i64,
2034        network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
2035    ) -> Option<oneshot::Receiver<Result<(), InvocationError>>> {
2036        let mut delay_ms = if initial_delay_ms == 0 {
2037            // Caller explicitly requests an immediate first attempt (e.g. init
2038            // failed but TCP is up — no reason to wait before the next try).
2039            0
2040        } else {
2041            initial_delay_ms.max(RECONNECT_BASE_MS)
2042        };
2043        loop {
2044            log::info!("[layer] Reconnecting in {delay_ms} ms …");
2045            tokio::select! {
2046                _ = sleep(Duration::from_millis(delay_ms)) => {}
2047                hint = network_hint_rx.recv() => {
2048                    hint?; // shutdown
2049                    log::info!("[layer] Network hint → skipping backoff, reconnecting now");
2050                }
2051            }
2052
2053            match self.do_reconnect(ak, fk).await {
2054                Ok((new_rh, new_fk, new_ak, new_sid)) => {
2055                    *rh = new_rh; *fk = new_fk; *ak = new_ak; *sid = new_sid;
2056                    log::info!("[layer] TCP reconnected ✓ — initialising session …");
2057
2058                    // Spawn init_connection. MUST NOT be awaited inline — the
2059                    // reader loop must resume so it can route the RPC response.
2060                    // We give back a oneshot so the reader can act on failure.
2061                    let (init_tx, init_rx) = oneshot::channel();
2062                    let c   = self.clone();
2063                    let utx = self.inner.update_tx.clone();
2064                    tokio::spawn(async move {
2065                        // Respect FLOOD_WAIT before sending the result back.
2066                        // Without this, a FLOOD_WAIT from Telegram during init
2067                        // would immediately re-trigger another reconnect attempt,
2068                        // which would itself hit FLOOD_WAIT — a ban spiral.
2069                        let result = loop {
2070                            match c.init_connection().await {
2071                                Ok(()) => break Ok(()),
2072                                Err(InvocationError::Rpc(ref r))
2073                                    if r.flood_wait_seconds().is_some() =>
2074                                {
2075                                    let secs = r.flood_wait_seconds().unwrap();
2076                                    log::warn!(
2077                                        "[layer] init_connection FLOOD_WAIT_{secs} —                                          waiting before retry"
2078                                    );
2079                                    sleep(Duration::from_secs(secs + 1)).await;
2080                                    // loop and retry init_connection
2081                                }
2082                                Err(e) => break Err(e),
2083                            }
2084                        };
2085                        if result.is_ok() {
2086                            // Replay any updates missed during the outage.
2087                            if let Ok(missed) = c.get_difference().await {
2088                                for u in missed { let _ = utx.send(u); }
2089                            }
2090                        }
2091                        let _ = init_tx.send(result);
2092                    });
2093                    return Some(init_rx);
2094                }
2095                Err(e) => {
2096                    log::warn!("[layer] Reconnect attempt failed: {e}");
2097                    // Cap at max, then apply ±20 % jitter to avoid thundering herd.
2098                    // Ensure the delay always advances by at least RECONNECT_BASE_MS
2099                    // so a 0 initial delay on the first attempt doesn't spin-loop.
2100                    let next = delay_ms.saturating_mul(2).clamp(RECONNECT_BASE_MS, RECONNECT_MAX_SECS * 1_000);
2101                    delay_ms = jitter_delay(next).as_millis() as u64;
2102                }
2103            }
2104        }
2105    }
2106
2107    /// Reconnect to the home DC, replace the writer, and return the new read half.
2108    async fn do_reconnect(
2109        &self,
2110        _old_auth_key: &[u8; 256],
2111        _old_frame_kind: &FrameKind,
2112    ) -> Result<(OwnedReadHalf, FrameKind, [u8; 256], i64), InvocationError> {
2113        let home_dc_id = *self.inner.home_dc_id.lock().await;
2114        let (addr, saved_key, first_salt, time_offset) = {
2115            let opts = self.inner.dc_options.lock().await;
2116            match opts.get(&home_dc_id) {
2117                Some(e) => (e.addr.clone(), e.auth_key, e.first_salt, e.time_offset),
2118                None    => ("149.154.167.51:443".to_string(), None, 0, 0),
2119            }
2120        };
2121        let socks5    = self.inner.socks5.clone();
2122        let transport = self.inner.transport.clone();
2123
2124        let new_conn = if let Some(key) = saved_key {
2125            log::info!("[layer] Reconnecting to DC{home_dc_id} with saved key …");
2126            match Connection::connect_with_key(
2127                &addr, key, first_salt, time_offset, socks5.as_ref(), &transport,
2128            ).await {
2129                Ok(c)   => c,
2130                Err(e2) => {
2131                    log::warn!("[layer] connect_with_key failed ({e2}), fresh DH …");
2132                    Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
2133                }
2134            }
2135        } else {
2136            Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
2137        };
2138
2139        let (new_writer, new_read, new_fk) = new_conn.into_writer();
2140        let new_ak  = new_writer.enc.auth_key_bytes();
2141        let new_sid = new_writer.enc.session_id();
2142        *self.inner.writer.lock().await = new_writer;
2143
2144        // NOTE: init_connection() is intentionally NOT called here.
2145        //
2146        // do_reconnect() is always called from inside the reader loop's select!,
2147        // which means the reader task is blocked while this function runs.
2148        // init_connection() sends an RPC and awaits the response — but only the
2149        // reader task can route that response back to the pending caller.
2150        // Calling it here creates a self-deadlock that times out after 30 s.
2151        //
2152        // Instead, callers are responsible for spawning init_connection() in a
2153        // separate task AFTER the reader loop has resumed and can process frames.
2154
2155        Ok((new_read, new_fk, new_ak, new_sid))
2156    }
2157
2158    // ── Messaging ──────────────────────────────────────────────────────────
2159
2160    /// Send a text message. Use `"me"` for Saved Messages.
2161    pub async fn send_message(&self, peer: &str, text: &str) -> Result<(), InvocationError> {
2162        let p = self.resolve_peer(peer).await?;
2163        self.send_message_to_peer(p, text).await
2164    }
2165
2166    /// Send a message to an already-resolved peer (plain text shorthand).
2167    pub async fn send_message_to_peer(
2168        &self,
2169        peer: tl::enums::Peer,
2170        text: &str,
2171    ) -> Result<(), InvocationError> {
2172        self.send_message_to_peer_ex(peer, &InputMessage::text(text)).await
2173    }
2174
2175    /// Send a message with full [`InputMessage`] options.
2176    pub async fn send_message_to_peer_ex(
2177        &self,
2178        peer: tl::enums::Peer,
2179        msg:  &InputMessage,
2180    ) -> Result<(), InvocationError> {
2181        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2182        let schedule = if msg.schedule_once_online {
2183            Some(0x7FFF_FFFEi32)
2184        } else {
2185            msg.schedule_date
2186        };
2187
2188        // G-45: if media is attached, route through SendMedia instead of SendMessage.
2189        if let Some(media) = &msg.media {
2190            let req = tl::functions::messages::SendMedia {
2191                silent:                   msg.silent,
2192                background:               msg.background,
2193                clear_draft:              msg.clear_draft,
2194                noforwards:               false,
2195                update_stickersets_order: false,
2196                invert_media:             msg.invert_media,
2197                allow_paid_floodskip:     false,
2198                peer:                     input_peer,
2199                reply_to:                 msg.reply_header(),
2200                media:                    media.clone(),
2201                message:                  msg.text.clone(),
2202                random_id:                random_i64(),
2203                reply_markup:             msg.reply_markup.clone(),
2204                entities:                 msg.entities.clone(),
2205                schedule_date:            schedule,
2206                schedule_repeat_period:   None,
2207                send_as:                  None,
2208                quick_reply_shortcut:     None,
2209                effect:                   None,
2210                allow_paid_stars:         None,
2211                suggested_post:           None,
2212            };
2213            return self.rpc_call_raw_pub(&req).await.map(|_| ());
2214        }
2215
2216        let req = tl::functions::messages::SendMessage {
2217            no_webpage:               msg.no_webpage,
2218            silent:                   msg.silent,
2219            background:               msg.background,
2220            clear_draft:              msg.clear_draft,
2221            noforwards:               false,
2222            update_stickersets_order: false,
2223            invert_media:             msg.invert_media,
2224            allow_paid_floodskip:     false,
2225            peer:                     input_peer,
2226            reply_to:                 msg.reply_header(),
2227            message:                  msg.text.clone(),
2228            random_id:                random_i64(),
2229            reply_markup:             msg.reply_markup.clone(),
2230            entities:                 msg.entities.clone(),
2231            schedule_date:            schedule,
2232            schedule_repeat_period:   None,
2233            send_as:                  None,
2234            quick_reply_shortcut:     None,
2235            effect:                   None,
2236            allow_paid_stars:         None,
2237            suggested_post:           None,
2238        };
2239        self.rpc_write(&req).await
2240    }
2241
2242    /// Send directly to Saved Messages.
2243    pub async fn send_to_self(&self, text: &str) -> Result<(), InvocationError> {
2244        let req = tl::functions::messages::SendMessage {
2245            no_webpage:               false,
2246            silent:                   false,
2247            background:               false,
2248            clear_draft:              false,
2249            noforwards:               false,
2250            update_stickersets_order: false,
2251            invert_media:             false,
2252            allow_paid_floodskip:     false,
2253            peer:                     tl::enums::InputPeer::PeerSelf,
2254            reply_to:                 None,
2255            message:                  text.to_string(),
2256            random_id:                random_i64(),
2257            reply_markup:             None,
2258            entities:                 None,
2259            schedule_date:            None,
2260            schedule_repeat_period:   None,
2261            send_as:                  None,
2262            quick_reply_shortcut:     None,
2263            effect:                   None,
2264            allow_paid_stars:         None,
2265            suggested_post:           None,
2266        };
2267        self.rpc_write(&req).await
2268    }
2269
2270    /// Edit an existing message.
2271    pub async fn edit_message(
2272        &self,
2273        peer:       tl::enums::Peer,
2274        message_id: i32,
2275        new_text:   &str,
2276    ) -> Result<(), InvocationError> {
2277        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2278        let req = tl::functions::messages::EditMessage {
2279            no_webpage:    false,
2280            invert_media:  false,
2281            peer:          input_peer,
2282            id:            message_id,
2283            message:       Some(new_text.to_string()),
2284            media:         None,
2285            reply_markup:  None,
2286            entities:      None,
2287            schedule_date: None,
2288            schedule_repeat_period: None,
2289            quick_reply_shortcut_id: None,
2290        };
2291        self.rpc_write(&req).await
2292    }
2293
2294    /// Forward messages from `source` to `destination`.
2295    pub async fn forward_messages(
2296        &self,
2297        destination: tl::enums::Peer,
2298        message_ids: &[i32],
2299        source:      tl::enums::Peer,
2300    ) -> Result<(), InvocationError> {
2301        let cache = self.inner.peer_cache.read().await;
2302        let to_peer   = cache.peer_to_input(&destination);
2303        let from_peer = cache.peer_to_input(&source);
2304        drop(cache);
2305
2306        let req = tl::functions::messages::ForwardMessages {
2307            silent:            false,
2308            background:        false,
2309            with_my_score:     false,
2310            drop_author:       false,
2311            drop_media_captions: false,
2312            noforwards:        false,
2313            from_peer,
2314            id:                message_ids.to_vec(),
2315            random_id:         (0..message_ids.len()).map(|_| random_i64()).collect(),
2316            to_peer,
2317            top_msg_id:        None,
2318            reply_to:          None,
2319            schedule_date:     None,
2320            schedule_repeat_period: None,
2321            send_as:           None,
2322            quick_reply_shortcut: None,
2323            effect:            None,
2324            video_timestamp:   None,
2325            allow_paid_stars:  None,
2326            allow_paid_floodskip: false,
2327            suggested_post:    None,
2328        };
2329        self.rpc_write(&req).await
2330    }
2331
2332    /// Delete messages by ID.
2333    pub async fn delete_messages(&self, message_ids: Vec<i32>, revoke: bool) -> Result<(), InvocationError> {
2334        let req = tl::functions::messages::DeleteMessages { revoke, id: message_ids };
2335        self.rpc_write(&req).await
2336    }
2337
2338    /// Get messages by their IDs from a peer.
2339    pub async fn get_messages_by_id(
2340        &self,
2341        peer: tl::enums::Peer,
2342        ids:  &[i32],
2343    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2344        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2345        let id_list: Vec<tl::enums::InputMessage> = ids.iter()
2346            .map(|&id| tl::enums::InputMessage::Id(tl::types::InputMessageId { id }))
2347            .collect();
2348        let req  = tl::functions::channels::GetMessages {
2349            channel: match &input_peer {
2350                tl::enums::InputPeer::Channel(c) =>
2351                    tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2352                        channel_id: c.channel_id, access_hash: c.access_hash
2353                    }),
2354                _ => return self.get_messages_user(input_peer, id_list).await,
2355            },
2356            id: id_list,
2357        };
2358        let body    = self.rpc_call_raw(&req).await?;
2359        let mut cur = Cursor::from_slice(&body);
2360        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2361            tl::enums::messages::Messages::Messages(m) => m.messages,
2362            tl::enums::messages::Messages::Slice(m)    => m.messages,
2363            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2364            tl::enums::messages::Messages::NotModified(_) => vec![],
2365        };
2366        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2367    }
2368
2369    async fn get_messages_user(
2370        &self,
2371        _peer: tl::enums::InputPeer,
2372        ids:   Vec<tl::enums::InputMessage>,
2373    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2374        let req = tl::functions::messages::GetMessages { id: ids };
2375        let body    = self.rpc_call_raw(&req).await?;
2376        let mut cur = Cursor::from_slice(&body);
2377        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2378            tl::enums::messages::Messages::Messages(m) => m.messages,
2379            tl::enums::messages::Messages::Slice(m)    => m.messages,
2380            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2381            tl::enums::messages::Messages::NotModified(_) => vec![],
2382        };
2383        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2384    }
2385
2386    /// Get the pinned message in a chat.
2387    pub async fn get_pinned_message(
2388        &self,
2389        peer: tl::enums::Peer,
2390    ) -> Result<Option<update::IncomingMessage>, InvocationError> {
2391        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2392        let req = tl::functions::messages::Search {
2393            peer: input_peer,
2394            q: String::new(),
2395            from_id: None,
2396            saved_peer_id: None,
2397            saved_reaction: None,
2398            top_msg_id: None,
2399            filter: tl::enums::MessagesFilter::InputMessagesFilterPinned,
2400            min_date: 0,
2401            max_date: 0,
2402            offset_id: 0,
2403            add_offset: 0,
2404            limit: 1,
2405            max_id: 0,
2406            min_id: 0,
2407            hash: 0,
2408        };
2409        let body    = self.rpc_call_raw(&req).await?;
2410        let mut cur = Cursor::from_slice(&body);
2411        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2412            tl::enums::messages::Messages::Messages(m) => m.messages,
2413            tl::enums::messages::Messages::Slice(m)    => m.messages,
2414            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2415            tl::enums::messages::Messages::NotModified(_) => vec![],
2416        };
2417        Ok(msgs.into_iter().next().map(update::IncomingMessage::from_raw))
2418    }
2419
2420    /// Pin a message in a chat.
2421    pub async fn pin_message(
2422        &self,
2423        peer:       tl::enums::Peer,
2424        message_id: i32,
2425        silent:     bool,
2426        unpin:      bool,
2427        pm_oneside: bool,
2428    ) -> Result<(), InvocationError> {
2429        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2430        let req = tl::functions::messages::UpdatePinnedMessage {
2431            silent,
2432            unpin,
2433            pm_oneside,
2434            peer: input_peer,
2435            id:   message_id,
2436        };
2437        self.rpc_write(&req).await
2438    }
2439
2440    /// Unpin a specific message.
2441    pub async fn unpin_message(
2442        &self,
2443        peer:       tl::enums::Peer,
2444        message_id: i32,
2445    ) -> Result<(), InvocationError> {
2446        self.pin_message(peer, message_id, true, true, false).await
2447    }
2448
2449    /// Unpin all messages in a chat.
2450    pub async fn unpin_all_messages(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2451        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2452        let req = tl::functions::messages::UnpinAllMessages {
2453            peer:      input_peer,
2454            top_msg_id: None,
2455            saved_peer_id: None,
2456        };
2457        self.rpc_write(&req).await
2458    }
2459
2460    // ── Message search ─────────────────────────────────────────────────────
2461
2462    /// Search messages in a chat (simple form).
2463    /// For advanced filtering use [`Client::search`] → [`SearchBuilder`].
2464    pub async fn search_messages(
2465        &self,
2466        peer:  tl::enums::Peer,
2467        query: &str,
2468        limit: i32,
2469    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2470        self.search(peer, query).limit(limit).fetch(self).await
2471    }
2472
2473    /// G-31: Fluent search builder for in-chat message search.
2474    pub fn search(&self, peer: tl::enums::Peer, query: &str) -> SearchBuilder {
2475        SearchBuilder::new(peer, query.to_string())
2476    }
2477
2478    /// Search globally (simple form). For filtering use [`Client::search_global_builder`].
2479    pub async fn search_global(
2480        &self,
2481        query: &str,
2482        limit: i32,
2483    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2484        self.search_global_builder(query).limit(limit).fetch(self).await
2485    }
2486
2487    /// G-32: Fluent builder for global cross-chat search.
2488    pub fn search_global_builder(&self, query: &str) -> GlobalSearchBuilder {
2489        GlobalSearchBuilder::new(query.to_string())
2490    }
2491
2492    // ── Scheduled messages ─────────────────────────────────────────────────
2493
2494    /// Retrieve all scheduled messages in a chat.
2495    ///
2496    /// Scheduled messages are messages set to be sent at a future time using
2497    /// [`InputMessage::schedule_date`].  Returns them newest-first.
2498    ///
2499    /// # Example
2500    /// ```rust,no_run
2501    /// # async fn f(client: layer_client::Client, peer: layer_tl_types::enums::Peer) -> Result<(), Box<dyn std::error::Error>> {
2502    /// let scheduled = client.get_scheduled_messages(peer).await?;
2503    /// for msg in &scheduled {
2504    ///     println!("Scheduled: {:?} at {:?}", msg.text(), msg.date());
2505    /// }
2506    /// # Ok(()) }
2507    /// ```
2508    pub async fn get_scheduled_messages(
2509        &self,
2510        peer: tl::enums::Peer,
2511    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2512        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2513        let req = tl::functions::messages::GetScheduledHistory {
2514            peer: input_peer,
2515            hash: 0,
2516        };
2517        let body    = self.rpc_call_raw(&req).await?;
2518        let mut cur = Cursor::from_slice(&body);
2519        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2520            tl::enums::messages::Messages::Messages(m)        => m.messages,
2521            tl::enums::messages::Messages::Slice(m)           => m.messages,
2522            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2523            tl::enums::messages::Messages::NotModified(_)     => vec![],
2524        };
2525        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2526    }
2527
2528    /// Delete one or more scheduled messages by their IDs.
2529    pub async fn delete_scheduled_messages(
2530        &self,
2531        peer: tl::enums::Peer,
2532        ids:  Vec<i32>,
2533    ) -> Result<(), InvocationError> {
2534        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2535        let req = tl::functions::messages::DeleteScheduledMessages {
2536            peer: input_peer,
2537            id:   ids,
2538        };
2539        self.rpc_write(&req).await
2540    }
2541
2542    // ── Callback / Inline Queries ──────────────────────────────────────────
2543
2544    /// G-24: Edit an inline message by its [`InputBotInlineMessageId`].
2545    ///
2546    /// Inline messages live on the bot's home DC, not necessarily the current
2547    /// connection's DC.  This method sends the edit RPC on the correct DC by
2548    /// using the DC ID encoded in `msg_id` (high 20 bits of the `dc_id` field).
2549    ///
2550    /// # Example
2551    /// ```rust,no_run
2552    /// # async fn f(
2553    /// #   client: layer_client::Client,
2554    /// #   id: layer_tl_types::enums::InputBotInlineMessageId,
2555    /// # ) -> Result<(), Box<dyn std::error::Error>> {
2556    /// client.edit_inline_message(id, "new text", None).await?;
2557    /// # Ok(()) }
2558    /// ```
2559    pub async fn edit_inline_message(
2560        &self,
2561        id:           tl::enums::InputBotInlineMessageId,
2562        new_text:     &str,
2563        reply_markup: Option<tl::enums::ReplyMarkup>,
2564    ) -> Result<bool, InvocationError> {
2565        let req = tl::functions::messages::EditInlineBotMessage {
2566            no_webpage:   false,
2567            invert_media: false,
2568            id,
2569            message:      Some(new_text.to_string()),
2570            media:        None,
2571            reply_markup,
2572            entities:     None,
2573        };
2574        let body = self.rpc_call_raw(&req).await?;
2575        // Bool#997275b5 = boolTrue; Bool#bc799737 = boolFalse
2576        Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
2577    }
2578
2579    /// Answer a callback query from an inline keyboard button press (bots only).
2580    pub async fn answer_callback_query(
2581        &self,
2582        query_id: i64,
2583        text:     Option<&str>,
2584        alert:    bool,
2585    ) -> Result<bool, InvocationError> {
2586        let req = tl::functions::messages::SetBotCallbackAnswer {
2587            alert,
2588            query_id,
2589            message:    text.map(|s| s.to_string()),
2590            url:        None,
2591            cache_time: 0,
2592        };
2593        let body = self.rpc_call_raw(&req).await?;
2594        Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
2595    }
2596
2597    pub async fn answer_inline_query(
2598        &self,
2599        query_id:    i64,
2600        results:     Vec<tl::enums::InputBotInlineResult>,
2601        cache_time:  i32,
2602        is_personal: bool,
2603        next_offset: Option<String>,
2604    ) -> Result<bool, InvocationError> {
2605        let req = tl::functions::messages::SetInlineBotResults {
2606            gallery:        false,
2607            private:        is_personal,
2608            query_id,
2609            results,
2610            cache_time,
2611            next_offset,
2612            switch_pm:      None,
2613            switch_webview: None,
2614        };
2615        let body = self.rpc_call_raw(&req).await?;
2616        Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
2617    }
2618
2619    // ── Dialogs ────────────────────────────────────────────────────────────
2620
2621    /// Fetch up to `limit` dialogs, most recent first. Populates entity/message.
2622    pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
2623        let req = tl::functions::messages::GetDialogs {
2624            exclude_pinned: false,
2625            folder_id:      None,
2626            offset_date:    0,
2627            offset_id:      0,
2628            offset_peer:    tl::enums::InputPeer::Empty,
2629            limit,
2630            hash:           0,
2631        };
2632
2633        let body    = self.rpc_call_raw(&req).await?;
2634        let mut cur = Cursor::from_slice(&body);
2635        let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2636            tl::enums::messages::Dialogs::Dialogs(d) => d,
2637            tl::enums::messages::Dialogs::Slice(d)   => tl::types::messages::Dialogs {
2638                dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2639            },
2640            tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
2641        };
2642
2643        // Build message map
2644        let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2645            .map(|m| {
2646                let id = match &m {
2647                    tl::enums::Message::Message(x) => x.id,
2648                    tl::enums::Message::Service(x) => x.id,
2649                    tl::enums::Message::Empty(x)   => x.id,
2650                };
2651                (id, m)
2652            })
2653            .collect();
2654
2655        // Build user map
2656        let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2657            .filter_map(|u| {
2658                if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2659            })
2660            .collect();
2661
2662        // Build chat map
2663        let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2664            .filter_map(|c| {
2665                let id = match &c {
2666                    tl::enums::Chat::Chat(x)             => x.id,
2667                    tl::enums::Chat::Forbidden(x)    => x.id,
2668                    tl::enums::Chat::Channel(x)          => x.id,
2669                    tl::enums::Chat::ChannelForbidden(x) => x.id,
2670                    tl::enums::Chat::Empty(x)            => x.id,
2671                };
2672                Some((id, c))
2673            })
2674            .collect();
2675
2676        // Cache peers for future access_hash lookups
2677        {
2678            let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2679            let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2680            self.cache_users_slice(&u_list).await;
2681            self.cache_chats_slice(&c_list).await;
2682        }
2683
2684        let result = raw.dialogs.into_iter().map(|d| {
2685            let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2686            let peer   = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2687
2688            let message = msg_map.get(&top_id).cloned();
2689            let entity = peer.and_then(|p| match p {
2690                tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2691                _ => None,
2692            });
2693            let chat = peer.and_then(|p| match p {
2694                tl::enums::Peer::Chat(c)    => chat_map.get(&c.chat_id).cloned(),
2695                tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2696                _ => None,
2697            });
2698
2699            Dialog { raw: d, message, entity, chat }
2700        }).collect();
2701
2702        Ok(result)
2703    }
2704
2705    /// Internal helper: fetch dialogs with a custom GetDialogs request.
2706    #[allow(dead_code)]
2707    async fn get_dialogs_raw(
2708        &self,
2709        req: tl::functions::messages::GetDialogs,
2710    ) -> Result<Vec<Dialog>, InvocationError> {
2711        let body    = self.rpc_call_raw(&req).await?;
2712        let mut cur = Cursor::from_slice(&body);
2713        let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2714            tl::enums::messages::Dialogs::Dialogs(d) => d,
2715            tl::enums::messages::Dialogs::Slice(d)   => tl::types::messages::Dialogs {
2716                dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2717            },
2718            tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
2719        };
2720
2721        let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2722            .map(|m| {
2723                let id = match &m {
2724                    tl::enums::Message::Message(x) => x.id,
2725                    tl::enums::Message::Service(x) => x.id,
2726                    tl::enums::Message::Empty(x)   => x.id,
2727                };
2728                (id, m)
2729            })
2730            .collect();
2731
2732        let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2733            .filter_map(|u| {
2734                if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2735            })
2736            .collect();
2737
2738        let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2739            .filter_map(|c| {
2740                let id = match &c {
2741                    tl::enums::Chat::Chat(x)             => x.id,
2742                    tl::enums::Chat::Forbidden(x)    => x.id,
2743                    tl::enums::Chat::Channel(x)          => x.id,
2744                    tl::enums::Chat::ChannelForbidden(x) => x.id,
2745                    tl::enums::Chat::Empty(x)            => x.id,
2746                };
2747                Some((id, c))
2748            })
2749            .collect();
2750
2751        {
2752            let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2753            let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2754            self.cache_users_slice(&u_list).await;
2755            self.cache_chats_slice(&c_list).await;
2756        }
2757
2758        let result = raw.dialogs.into_iter().map(|d| {
2759            let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2760            let peer   = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2761
2762            let message = msg_map.get(&top_id).cloned();
2763            let entity = peer.and_then(|p| match p {
2764                tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2765                _ => None,
2766            });
2767            let chat = peer.and_then(|p| match p {
2768                tl::enums::Peer::Chat(c)    => chat_map.get(&c.chat_id).cloned(),
2769                tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2770                _ => None,
2771            });
2772
2773            Dialog { raw: d, message, entity, chat }
2774        }).collect();
2775
2776        Ok(result)
2777    }
2778
2779    /// Like `get_dialogs_raw` but also returns the total count from `messages.DialogsSlice`.
2780    async fn get_dialogs_raw_with_count(
2781        &self,
2782        req: tl::functions::messages::GetDialogs,
2783    ) -> Result<(Vec<Dialog>, Option<i32>), InvocationError> {
2784        let body    = self.rpc_call_raw(&req).await?;
2785        let mut cur = Cursor::from_slice(&body);
2786        let (raw, count) = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2787            tl::enums::messages::Dialogs::Dialogs(d) => (d, None),
2788            tl::enums::messages::Dialogs::Slice(d)   => {
2789                let cnt = Some(d.count);
2790                (tl::types::messages::Dialogs {
2791                    dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2792                }, cnt)
2793            }
2794            tl::enums::messages::Dialogs::NotModified(_) => return Ok((vec![], None)),
2795        };
2796
2797        let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2798            .filter_map(|m| {
2799                let id = match &m {
2800                    tl::enums::Message::Message(x) => x.id,
2801                    tl::enums::Message::Service(x) => x.id,
2802                    tl::enums::Message::Empty(x)   => x.id,
2803                };
2804                Some((id, m))
2805            }).collect();
2806
2807        let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2808            .filter_map(|u| {
2809                if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2810            }).collect();
2811
2812        let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2813            .filter_map(|c| {
2814                let id = match &c {
2815                    tl::enums::Chat::Chat(x)             => x.id,
2816                    tl::enums::Chat::Forbidden(x)        => x.id,
2817                    tl::enums::Chat::Channel(x)          => x.id,
2818                    tl::enums::Chat::ChannelForbidden(x) => x.id,
2819                    tl::enums::Chat::Empty(x)            => x.id,
2820                };
2821                Some((id, c))
2822            }).collect();
2823
2824        {
2825            let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2826            let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2827            self.cache_users_slice(&u_list).await;
2828            self.cache_chats_slice(&c_list).await;
2829        }
2830
2831        let result = raw.dialogs.into_iter().map(|d| {
2832            let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2833            let peer   = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2834            let message = msg_map.get(&top_id).cloned();
2835            let entity = peer.and_then(|p| match p {
2836                tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2837                _ => None,
2838            });
2839            let chat = peer.and_then(|p| match p {
2840                tl::enums::Peer::Chat(c)    => chat_map.get(&c.chat_id).cloned(),
2841                tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2842                _ => None,
2843            });
2844            Dialog { raw: d, message, entity, chat }
2845        }).collect();
2846
2847        Ok((result, count))
2848    }
2849
2850    /// Like `get_messages` but also returns the total count from `messages.Slice`.
2851    async fn get_messages_with_count(
2852        &self,
2853        peer:      tl::enums::InputPeer,
2854        limit:     i32,
2855        offset_id: i32,
2856    ) -> Result<(Vec<update::IncomingMessage>, Option<i32>), InvocationError> {
2857        let req = tl::functions::messages::GetHistory {
2858            peer, offset_id, offset_date: 0, add_offset: 0,
2859            limit, max_id: 0, min_id: 0, hash: 0,
2860        };
2861        let body    = self.rpc_call_raw(&req).await?;
2862        let mut cur = Cursor::from_slice(&body);
2863        let (msgs, count) = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2864            tl::enums::messages::Messages::Messages(m) => (m.messages, None),
2865            tl::enums::messages::Messages::Slice(m)    => {
2866                let cnt = Some(m.count);
2867                (m.messages, cnt)
2868            }
2869            tl::enums::messages::Messages::ChannelMessages(m) => (m.messages, Some(m.count)),
2870            tl::enums::messages::Messages::NotModified(_) => (vec![], None),
2871        };
2872        Ok((msgs.into_iter().map(update::IncomingMessage::from_raw).collect(), count))
2873    }
2874
2875    /// Download all bytes of a media attachment and save them to `path`.
2876    ///
2877    /// # Example
2878    /// ```rust,no_run
2879    /// # async fn f(client: layer_client::Client, msg: layer_client::update::IncomingMessage) -> Result<(), Box<dyn std::error::Error>> {
2880    /// if let Some(loc) = msg.download_location() {
2881    ///     client.download_media_to_file(loc, "/tmp/file.jpg").await?;
2882    /// }
2883    /// # Ok(()) }
2884    /// ```
2885    pub async fn download_media_to_file(
2886        &self,
2887        location: tl::enums::InputFileLocation,
2888        path:     impl AsRef<std::path::Path>,
2889    ) -> Result<(), InvocationError> {
2890        let bytes = self.download_media(location).await?;
2891        std::fs::write(path, &bytes).map_err(InvocationError::Io)?;
2892        Ok(())
2893    }
2894
2895    pub async fn delete_dialog(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2896        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2897        let req = tl::functions::messages::DeleteHistory {
2898            just_clear:  false,
2899            revoke:      false,
2900            peer:        input_peer,
2901            max_id:      0,
2902            min_date:    None,
2903            max_date:    None,
2904        };
2905        self.rpc_write(&req).await
2906    }
2907
2908    /// Mark all messages in a chat as read.
2909    pub async fn mark_as_read(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2910        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2911        match &input_peer {
2912            tl::enums::InputPeer::Channel(c) => {
2913                let req = tl::functions::channels::ReadHistory {
2914                    channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2915                        channel_id: c.channel_id, access_hash: c.access_hash,
2916                    }),
2917                    max_id: 0,
2918                };
2919                self.rpc_call_raw(&req).await?;
2920            }
2921            _ => {
2922                let req = tl::functions::messages::ReadHistory { peer: input_peer, max_id: 0 };
2923                self.rpc_call_raw(&req).await?;
2924            }
2925        }
2926        Ok(())
2927    }
2928
2929    /// Clear unread mention markers.
2930    pub async fn clear_mentions(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2931        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2932        let req = tl::functions::messages::ReadMentions { peer: input_peer, top_msg_id: None };
2933        self.rpc_write(&req).await
2934    }
2935
2936    // ── Chat actions (typing, etc) ─────────────────────────────────────────
2937
2938    /// Send a chat action (typing indicator, uploading photo, etc).
2939    ///
2940    /// For "typing" use `tl::enums::SendMessageAction::Typing`.
2941    /// For forum topic support use [`send_chat_action_ex`](Self::send_chat_action_ex)
2942    /// or the [`typing_in_topic`](Self::typing_in_topic) helper.
2943    pub async fn send_chat_action(
2944        &self,
2945        peer:   tl::enums::Peer,
2946        action: tl::enums::SendMessageAction,
2947    ) -> Result<(), InvocationError> {
2948        self.send_chat_action_ex(peer, action, None).await
2949    }
2950
2951    // ── Join / invite links ────────────────────────────────────────────────
2952
2953    /// Join a public chat or channel by username/peer.
2954    pub async fn join_chat(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2955        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2956        match input_peer {
2957            tl::enums::InputPeer::Channel(c) => {
2958                let req = tl::functions::channels::JoinChannel {
2959                    channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2960                        channel_id: c.channel_id, access_hash: c.access_hash,
2961                    }),
2962                };
2963                self.rpc_call_raw(&req).await?;
2964            }
2965            tl::enums::InputPeer::Chat(c) => {
2966                let req = tl::functions::messages::AddChatUser {
2967                    chat_id:  c.chat_id,
2968                    user_id:  tl::enums::InputUser::UserSelf,
2969                    fwd_limit: 0,
2970                };
2971                self.rpc_call_raw(&req).await?;
2972            }
2973            _ => return Err(InvocationError::Deserialize("cannot join this peer type".into())),
2974        }
2975        Ok(())
2976    }
2977
2978    /// Accept and join via an invite link.
2979    pub async fn accept_invite_link(&self, link: &str) -> Result<(), InvocationError> {
2980        let hash = Self::parse_invite_hash(link)
2981            .ok_or_else(|| InvocationError::Deserialize(format!("invalid invite link: {link}")))?;
2982        let req = tl::functions::messages::ImportChatInvite { hash: hash.to_string() };
2983        self.rpc_write(&req).await
2984    }
2985
2986    /// Extract hash from `https://t.me/+HASH` or `https://t.me/joinchat/HASH`.
2987    pub fn parse_invite_hash(link: &str) -> Option<&str> {
2988        if let Some(pos) = link.find("/+") {
2989            return Some(&link[pos + 2..]);
2990        }
2991        if let Some(pos) = link.find("/joinchat/") {
2992            return Some(&link[pos + 10..]);
2993        }
2994        None
2995    }
2996
2997    // ── Message history (paginated) ────────────────────────────────────────
2998
2999    /// Fetch a page of messages from a peer's history.
3000    pub async fn get_messages(
3001        &self,
3002        peer:      tl::enums::InputPeer,
3003        limit:     i32,
3004        offset_id: i32,
3005    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
3006        let req = tl::functions::messages::GetHistory {
3007            peer, offset_id, offset_date: 0, add_offset: 0,
3008            limit, max_id: 0, min_id: 0, hash: 0,
3009        };
3010        let body    = self.rpc_call_raw(&req).await?;
3011        let mut cur = Cursor::from_slice(&body);
3012        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
3013            tl::enums::messages::Messages::Messages(m) => m.messages,
3014            tl::enums::messages::Messages::Slice(m)    => m.messages,
3015            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
3016            tl::enums::messages::Messages::NotModified(_) => vec![],
3017        };
3018        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
3019    }
3020
3021    // ── Peer resolution ────────────────────────────────────────────────────
3022
3023    /// Resolve a peer string to a [`tl::enums::Peer`].
3024    pub async fn resolve_peer(
3025        &self,
3026        peer: &str,
3027    ) -> Result<tl::enums::Peer, InvocationError> {
3028        match peer.trim() {
3029            "me" | "self" => Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 })),
3030            username if username.starts_with('@') => {
3031                self.resolve_username(&username[1..]).await
3032            }
3033            id_str => {
3034                if let Ok(id) = id_str.parse::<i64>() {
3035                    Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: id }))
3036                } else {
3037                    Err(InvocationError::Deserialize(format!("cannot resolve peer: {peer}")))
3038                }
3039            }
3040        }
3041    }
3042
3043    /// Resolve a Telegram username to a [`tl::enums::Peer`] and cache the access hash.
3044    ///
3045    /// Also accepts usernames without the leading `@`.
3046    pub async fn resolve_username(&self, username: &str) -> Result<tl::enums::Peer, InvocationError> {
3047        let req  = tl::functions::contacts::ResolveUsername {
3048            username: username.to_string(), referer: None,
3049        };
3050        let body = self.rpc_call_raw(&req).await?;
3051        let mut cur = Cursor::from_slice(&body);
3052        let resolved = match tl::enums::contacts::ResolvedPeer::deserialize(&mut cur)? {
3053            tl::enums::contacts::ResolvedPeer::ResolvedPeer(r) => r,
3054        };
3055        // Cache users and chats from the resolution
3056        self.cache_users_slice(&resolved.users).await;
3057        self.cache_chats_slice(&resolved.chats).await;
3058        Ok(resolved.peer)
3059    }
3060
3061    // ── Raw invoke ─────────────────────────────────────────────────────────
3062
3063    /// Invoke any TL function directly, handling flood-wait retries.
3064    pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
3065        let body = self.rpc_call_raw(req).await?;
3066        let mut cur = Cursor::from_slice(&body);
3067        R::Return::deserialize(&mut cur).map_err(Into::into)
3068    }
3069
3070    async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
3071        let mut fail_count   = NonZeroU32::new(1).unwrap();
3072        let mut slept_so_far = Duration::default();
3073        loop {
3074            match self.do_rpc_call(req).await {
3075                Ok(body) => return Ok(body),
3076                Err(e) => {
3077                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
3078                    match self.inner.retry_policy.should_retry(&ctx) {
3079                        ControlFlow::Continue(delay) => {
3080                            sleep(delay).await;
3081                            slept_so_far += delay;
3082                            fail_count = fail_count.saturating_add(1);
3083                        }
3084                        ControlFlow::Break(()) => return Err(ctx.error),
3085                    }
3086                }
3087            }
3088        }
3089    }
3090
3091    /// Send an RPC call and await the response via a oneshot channel.
3092    ///
3093    /// This is the core of the split-stream design:
3094    ///  1. Pack the request and get its msg_id.
3095    ///  2. Register a oneshot Sender in the pending map (BEFORE sending).
3096    ///  3. Send the frame while holding the writer lock.
3097    ///  4. Release the writer lock immediately — the reader task now runs freely.
3098    ///  5. Await the oneshot Receiver; the reader task will fulfill it when
3099    ///     the matching rpc_result frame arrives.
3100    async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
3101        let (tx, rx) = oneshot::channel();
3102        {
3103            let raw_body = req.to_bytes();
3104            // G-06: compress large outgoing bodies
3105            let body = maybe_gz_pack(&raw_body);
3106
3107            let mut w = self.inner.writer.lock().await;
3108            let fk = w.frame_kind.clone();
3109
3110            // G-04 + G-07: drain any pending acks; if non-empty bundle them with
3111            // the request in a MessageContainer so acks piggyback on every send.
3112            let acks: Vec<i64> = w.pending_ack.drain(..).collect();
3113
3114            if acks.is_empty() {
3115                // ── Simple path: standalone request ──────────────────────────
3116                let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
3117                w.sent_bodies.insert(msg_id, body); // G-02
3118                self.inner.pending.lock().await.insert(msg_id, tx);
3119                send_frame_write(&mut w.write_half, &wire, &fk).await?;
3120            } else {
3121                // ── G-07 container path: [MsgsAck, request] ─────────────────
3122                // Build MsgsAck inner body
3123                let ack_body = build_msgs_ack_body(&acks);
3124                // Allocate inner msg_id+seqno for each item
3125                let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false); // non-content
3126                let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);  // content
3127
3128                // Build container payload
3129                let container_payload = build_container_body(&[
3130                    (ack_msg_id, ack_seqno, ack_body.as_slice()),
3131                    (req_msg_id, req_seqno, body.as_slice()),
3132                ]);
3133
3134                // Encrypt the container as a non-content-related outer message
3135                let wire = w.enc.pack_container(&container_payload);
3136
3137                w.sent_bodies.insert(req_msg_id, body); // G-02
3138                self.inner.pending.lock().await.insert(req_msg_id, tx);
3139                send_frame_write(&mut w.write_half, &wire, &fk).await?;
3140                log::trace!("[layer] G-07 container: bundled {} acks + request", acks.len());
3141            }
3142        }
3143        match tokio::time::timeout(Duration::from_secs(30), rx).await {
3144            Ok(Ok(result)) => result,
3145            Ok(Err(_))     => Err(InvocationError::Deserialize("RPC channel closed (reader died?)".into())),
3146            Err(_)         => Err(InvocationError::Deserialize("RPC timed out after 30 s".into())),
3147        }
3148    }
3149
3150    /// Like `rpc_call_raw` but for write RPCs (Serializable, return type is Updates).
3151    /// Uses the same oneshot mechanism — the reader task signals success/failure.
3152    async fn rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
3153        let mut fail_count   = NonZeroU32::new(1).unwrap();
3154        let mut slept_so_far = Duration::default();
3155        loop {
3156            let result = self.do_rpc_write(req).await;
3157            match result {
3158                Ok(()) => return Ok(()),
3159                Err(e) => {
3160                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
3161                    match self.inner.retry_policy.should_retry(&ctx) {
3162                        ControlFlow::Continue(delay) => {
3163                            sleep(delay).await;
3164                            slept_so_far += delay;
3165                            fail_count = fail_count.saturating_add(1);
3166                        }
3167                        ControlFlow::Break(()) => return Err(ctx.error),
3168                    }
3169                }
3170            }
3171        }
3172    }
3173
3174    async fn do_rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
3175        let (tx, rx) = oneshot::channel();
3176        {
3177            let raw_body = req.to_bytes();
3178            // G-06: compress large outgoing bodies
3179            let body = maybe_gz_pack(&raw_body);
3180
3181            let mut w = self.inner.writer.lock().await;
3182            let fk = w.frame_kind.clone();
3183
3184            // G-04 + G-07: drain pending acks and bundle into container if any
3185            let acks: Vec<i64> = w.pending_ack.drain(..).collect();
3186
3187            if acks.is_empty() {
3188                let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
3189                w.sent_bodies.insert(msg_id, body); // G-02
3190                self.inner.pending.lock().await.insert(msg_id, tx);
3191                send_frame_write(&mut w.write_half, &wire, &fk).await?;
3192            } else {
3193                let ack_body = build_msgs_ack_body(&acks);
3194                let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
3195                let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);
3196                let container_payload = build_container_body(&[
3197                    (ack_msg_id, ack_seqno, ack_body.as_slice()),
3198                    (req_msg_id, req_seqno, body.as_slice()),
3199                ]);
3200                let wire = w.enc.pack_container(&container_payload);
3201                w.sent_bodies.insert(req_msg_id, body); // G-02
3202                self.inner.pending.lock().await.insert(req_msg_id, tx);
3203                send_frame_write(&mut w.write_half, &wire, &fk).await?;
3204                log::trace!("[layer] G-07 write container: bundled {} acks + write", acks.len());
3205            }
3206        }
3207        match tokio::time::timeout(Duration::from_secs(30), rx).await {
3208            Ok(Ok(result)) => result.map(|_| ()),
3209            Ok(Err(_))     => Err(InvocationError::Deserialize("rpc_write channel closed".into())),
3210            Err(_)         => Err(InvocationError::Deserialize("rpc_write timed out after 30 s".into())),
3211        }
3212    }
3213
3214    // ── initConnection ─────────────────────────────────────────────────────
3215
3216    async fn init_connection(&self) -> Result<(), InvocationError> {
3217        use tl::functions::{InvokeWithLayer, InitConnection, help::GetConfig};
3218        let req = InvokeWithLayer {
3219            layer: tl::LAYER,
3220            query: InitConnection {
3221                api_id:           self.inner.api_id,
3222                device_model:     "Linux".to_string(),
3223                system_version:   "1.0".to_string(),
3224                app_version:      env!("CARGO_PKG_VERSION").to_string(),
3225                system_lang_code: "en".to_string(),
3226                lang_pack:        "".to_string(),
3227                lang_code:        "en".to_string(),
3228                proxy:            None,
3229                params:           None,
3230                query:            GetConfig {},
3231            },
3232        };
3233
3234        // Use the split-writer oneshot path (reader task routes the response).
3235        let body = self.rpc_call_raw_serializable(&req).await?;
3236
3237        let mut cur = Cursor::from_slice(&body);
3238        if let Ok(tl::enums::Config::Config(cfg)) = tl::enums::Config::deserialize(&mut cur) {
3239            let allow_ipv6 = self.inner.allow_ipv6;
3240            let mut opts = self.inner.dc_options.lock().await;
3241            for opt in &cfg.dc_options {
3242                let tl::enums::DcOption::DcOption(o) = opt;
3243                if o.media_only || o.cdn || o.tcpo_only { continue; }
3244                if o.ipv6 && !allow_ipv6 { continue; }
3245                let addr = format!("{}:{}", o.ip_address, o.port);
3246                let entry = opts.entry(o.id).or_insert_with(|| DcEntry {
3247                    dc_id: o.id, addr: addr.clone(),
3248                    auth_key: None, first_salt: 0, time_offset: 0,
3249                });
3250                entry.addr = addr;
3251            }
3252            log::info!("[layer] initConnection ✓  ({} DCs, ipv6={})", cfg.dc_options.len(), allow_ipv6);
3253        }
3254        Ok(())
3255    }
3256
3257    // ── DC migration ───────────────────────────────────────────────────────
3258
3259    async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
3260        let addr = {
3261            let opts = self.inner.dc_options.lock().await;
3262            opts.get(&new_dc_id).map(|e| e.addr.clone())
3263                .unwrap_or_else(|| "149.154.167.51:443".to_string())
3264        };
3265        log::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
3266
3267        let saved_key = {
3268            let opts = self.inner.dc_options.lock().await;
3269            opts.get(&new_dc_id).and_then(|e| e.auth_key)
3270        };
3271
3272        let socks5    = self.inner.socks5.clone();
3273        let transport = self.inner.transport.clone();
3274        let conn = if let Some(key) = saved_key {
3275            Connection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
3276        } else {
3277            Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
3278        };
3279
3280        let new_key = conn.auth_key_bytes();
3281        {
3282            let mut opts = self.inner.dc_options.lock().await;
3283            let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
3284                dc_id: new_dc_id, addr: addr.clone(),
3285                auth_key: None, first_salt: 0, time_offset: 0,
3286            });
3287            entry.auth_key = Some(new_key);
3288        }
3289
3290        // Split the new connection and replace writer + reader.
3291        let (new_writer, new_read, new_fk) = conn.into_writer();
3292        let new_ak  = new_writer.enc.auth_key_bytes();
3293        let new_sid = new_writer.enc.session_id();
3294        *self.inner.writer.lock().await = new_writer;
3295        *self.inner.home_dc_id.lock().await = new_dc_id;
3296
3297        // Hand the new read half to the reader task FIRST so it can route
3298        // the upcoming init_connection RPC response.
3299        let _ = self.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
3300
3301        // migrate_to() is called from user-facing methods (bot_sign_in,
3302        // request_login_code, sign_in) — NOT from inside the reader loop.
3303        // The reader task is a separate tokio task running concurrently, so
3304        // awaiting init_connection() here is safe: the reader is free to route
3305        // the RPC response while we wait. We must await before returning so
3306        // the caller can safely retry the original request on the new DC.
3307        //
3308        // Respect FLOOD_WAIT: if Telegram rate-limits init, wait and retry
3309        // rather than returning an error that would abort the whole auth flow.
3310        loop {
3311            match self.init_connection().await {
3312                Ok(()) => break,
3313                Err(InvocationError::Rpc(ref r))
3314                    if r.flood_wait_seconds().is_some() =>
3315                {
3316                    let secs = r.flood_wait_seconds().unwrap();
3317                    log::warn!(
3318                        "[layer] migrate_to DC{new_dc_id}: init FLOOD_WAIT_{secs} — waiting"
3319                    );
3320                    sleep(Duration::from_secs(secs + 1)).await;
3321                }
3322                Err(e) => return Err(e),
3323            }
3324        }
3325
3326        self.save_session().await.ok();
3327        log::info!("[layer] Now on DC{new_dc_id} ✓");
3328        Ok(())
3329    }
3330
3331    // ── G-34: Graceful shutdown ────────────────────────────────────────────
3332
3333    /// Gracefully shut down the client.
3334    ///
3335    /// Signals the reader task to exit cleanly. Equivalent to cancelling the
3336    /// [`ShutdownToken`] returned from [`Client::connect`].
3337    ///
3338    /// In-flight RPCs will receive a `Dropped` error. Call `save_session()`
3339    /// before this if you want to persist the current auth state.
3340    pub fn disconnect(&self) {
3341        self.inner.shutdown_token.cancel();
3342    }
3343
3344    // ── G-54: Expose sync_update_state publicly ────────────────────────────
3345
3346    /// Sync the internal pts/qts/seq/date state with the Telegram server.
3347    ///
3348    /// This is called automatically on `connect()`. Call it manually if you
3349    /// need to reset the update gap-detection counters, e.g. after resuming
3350    /// from a long hibernation.
3351    pub async fn sync_update_state(&self) {
3352        let _ = self.sync_pts_state().await;
3353    }
3354
3355    // ── Cache helpers ──────────────────────────────────────────────────────
3356
3357    async fn cache_user(&self, user: &tl::enums::User) {
3358        self.inner.peer_cache.write().await.cache_user(user);
3359    }
3360
3361    async fn cache_users_slice(&self, users: &[tl::enums::User]) {
3362        let mut cache = self.inner.peer_cache.write().await;
3363        cache.cache_users(users);
3364    }
3365
3366    async fn cache_chats_slice(&self, chats: &[tl::enums::Chat]) {
3367        let mut cache = self.inner.peer_cache.write().await;
3368        cache.cache_chats(chats);
3369    }
3370
3371    // Public versions used by sub-modules (media.rs, participants.rs, pts.rs)
3372    #[doc(hidden)]
3373    pub async fn cache_users_slice_pub(&self, users: &[tl::enums::User]) {
3374        self.cache_users_slice(users).await;
3375    }
3376
3377    #[doc(hidden)]
3378    pub async fn cache_chats_slice_pub(&self, chats: &[tl::enums::Chat]) {
3379        self.cache_chats_slice(chats).await;
3380    }
3381
3382    /// Public RPC call for use by sub-modules.
3383    #[doc(hidden)]
3384    pub async fn rpc_call_raw_pub<R: layer_tl_types::RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
3385        self.rpc_call_raw(req).await
3386    }
3387
3388    /// Like rpc_call_raw but takes a Serializable (for InvokeWithLayer wrappers).
3389    async fn rpc_call_raw_serializable<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
3390        let mut fail_count   = NonZeroU32::new(1).unwrap();
3391        let mut slept_so_far = Duration::default();
3392        loop {
3393            match self.do_rpc_write_returning_body(req).await {
3394                Ok(body) => return Ok(body),
3395                Err(e) => {
3396                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
3397                    match self.inner.retry_policy.should_retry(&ctx) {
3398                        ControlFlow::Continue(delay) => {
3399                            sleep(delay).await;
3400                            slept_so_far += delay;
3401                            fail_count = fail_count.saturating_add(1);
3402                        }
3403                        ControlFlow::Break(()) => return Err(ctx.error),
3404                    }
3405                }
3406            }
3407        }
3408    }
3409
3410    async fn do_rpc_write_returning_body<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
3411        let (tx, rx) = oneshot::channel();
3412        {
3413            let raw_body = req.to_bytes();
3414            let body = maybe_gz_pack(&raw_body); // G-06
3415            let mut w = self.inner.writer.lock().await;
3416            let fk = w.frame_kind.clone();
3417            let acks: Vec<i64> = w.pending_ack.drain(..).collect(); // G-04+G-07
3418            if acks.is_empty() {
3419                let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
3420                w.sent_bodies.insert(msg_id, body); // G-02
3421                self.inner.pending.lock().await.insert(msg_id, tx);
3422                send_frame_write(&mut w.write_half, &wire, &fk).await?;
3423            } else {
3424                let ack_body = build_msgs_ack_body(&acks);
3425                let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
3426                let (req_msg_id, req_seqno)  = w.enc.alloc_msg_seqno(true);
3427                let container_payload = build_container_body(&[
3428                    (ack_msg_id, ack_seqno, ack_body.as_slice()),
3429                    (req_msg_id, req_seqno, body.as_slice()),
3430                ]);
3431                let wire = w.enc.pack_container(&container_payload);
3432                w.sent_bodies.insert(req_msg_id, body); // G-02
3433                self.inner.pending.lock().await.insert(req_msg_id, tx);
3434                send_frame_write(&mut w.write_half, &wire, &fk).await?;
3435            }
3436        }
3437        match tokio::time::timeout(Duration::from_secs(30), rx).await {
3438            Ok(Ok(result)) => result,
3439            Ok(Err(_))     => Err(InvocationError::Deserialize("rpc channel closed".into())),
3440            Err(_)         => Err(InvocationError::Deserialize("rpc timed out after 30 s".into())),
3441        }
3442    }
3443
3444    // ── Paginated dialog iterator ──────────────────────────────────────────
3445
3446    /// Fetch dialogs page by page.
3447    ///
3448    /// Returns a [`DialogIter`] that can be advanced with [`DialogIter::next`].
3449    /// This lets you page through all dialogs without loading them all at once.
3450    ///
3451    /// # Example
3452    /// ```rust,no_run
3453    /// # async fn f(client: layer_client::Client) -> Result<(), Box<dyn std::error::Error>> {
3454    /// let mut iter = client.iter_dialogs();
3455    /// while let Some(dialog) = iter.next(&client).await? {
3456    ///     println!("{}", dialog.title());
3457    /// }
3458    /// # Ok(()) }
3459    /// ```
3460    pub fn iter_dialogs(&self) -> DialogIter {
3461        DialogIter {
3462            offset_date: 0,
3463            offset_id:   0,
3464            offset_peer: tl::enums::InputPeer::Empty,
3465            done:        false,
3466            buffer:      VecDeque::new(),
3467            total:       None,
3468        }
3469    }
3470
3471    /// Fetch messages from a peer, page by page.
3472    ///
3473    /// Returns a [`MessageIter`] that can be advanced with [`MessageIter::next`].
3474    ///
3475    /// # Example
3476    /// ```rust,no_run
3477    /// # async fn f(client: layer_client::Client, peer: layer_tl_types::enums::Peer) -> Result<(), Box<dyn std::error::Error>> {
3478    /// let mut iter = client.iter_messages(peer);
3479    /// while let Some(msg) = iter.next(&client).await? {
3480    ///     println!("{:?}", msg.text());
3481    /// }
3482    /// # Ok(()) }
3483    /// ```
3484    pub fn iter_messages(&self, peer: tl::enums::Peer) -> MessageIter {
3485        MessageIter {
3486            peer,
3487            offset_id: 0,
3488            done:      false,
3489            buffer:    VecDeque::new(),
3490            total:     None,
3491        }
3492    }
3493
3494    // ── resolve_peer helper returning Result on unknown hash ───────────────
3495
3496    /// Try to resolve a peer to InputPeer, returning an error if the access_hash
3497    /// is unknown (i.e. the peer has not been seen in any prior API call).
3498    pub async fn resolve_to_input_peer(
3499        &self,
3500        peer: &tl::enums::Peer,
3501    ) -> Result<tl::enums::InputPeer, InvocationError> {
3502        let cache = self.inner.peer_cache.read().await;
3503        match peer {
3504            tl::enums::Peer::User(u) => {
3505                if u.user_id == 0 {
3506                    return Ok(tl::enums::InputPeer::PeerSelf);
3507                }
3508                match cache.users.get(&u.user_id) {
3509                    Some(&hash) => Ok(tl::enums::InputPeer::User(tl::types::InputPeerUser {
3510                        user_id: u.user_id, access_hash: hash,
3511                    })),
3512                    None => Err(InvocationError::Deserialize(format!(
3513                        "access_hash unknown for user {}; resolve via username first", u.user_id
3514                    ))),
3515                }
3516            }
3517            tl::enums::Peer::Chat(c) => {
3518                Ok(tl::enums::InputPeer::Chat(tl::types::InputPeerChat { chat_id: c.chat_id }))
3519            }
3520            tl::enums::Peer::Channel(c) => {
3521                match cache.channels.get(&c.channel_id) {
3522                    Some(&hash) => Ok(tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
3523                        channel_id: c.channel_id, access_hash: hash,
3524                    })),
3525                    None => Err(InvocationError::Deserialize(format!(
3526                        "access_hash unknown for channel {}; resolve via username first", c.channel_id
3527                    ))),
3528                }
3529            }
3530        }
3531    }
3532
3533    // ── Multi-DC pool ──────────────────────────────────────────────────────
3534
3535    /// Invoke a request on a specific DC, using the pool.
3536    ///
3537    /// If the target DC has no auth key yet, one is acquired via DH and then
3538    /// authorized via `auth.exportAuthorization` / `auth.importAuthorization`
3539    /// so the worker DC can serve user-account requests too.
3540    pub async fn invoke_on_dc<R: RemoteCall>(
3541        &self,
3542        dc_id: i32,
3543        req:   &R,
3544    ) -> Result<R::Return, InvocationError> {
3545        let body = self.rpc_on_dc_raw(dc_id, req).await?;
3546        let mut cur = Cursor::from_slice(&body);
3547        R::Return::deserialize(&mut cur).map_err(Into::into)
3548    }
3549
3550    /// Raw RPC call routed to `dc_id`, exporting auth if needed.
3551    async fn rpc_on_dc_raw<R: RemoteCall>(
3552        &self,
3553        dc_id: i32,
3554        req:   &R,
3555    ) -> Result<Vec<u8>, InvocationError> {
3556        // Check if we need to open a new connection for this DC
3557        let needs_new = {
3558            let pool = self.inner.dc_pool.lock().await;
3559            !pool.has_connection(dc_id)
3560        };
3561
3562        if needs_new {
3563            let addr = {
3564                let opts = self.inner.dc_options.lock().await;
3565                opts.get(&dc_id).map(|e| e.addr.clone())
3566                    .ok_or_else(|| InvocationError::Deserialize(format!("unknown DC{dc_id}")))?
3567            };
3568
3569            let socks5    = self.inner.socks5.clone();
3570            let transport = self.inner.transport.clone();
3571            let saved_key = {
3572                let opts = self.inner.dc_options.lock().await;
3573                opts.get(&dc_id).and_then(|e| e.auth_key)
3574            };
3575
3576            let dc_conn = if let Some(key) = saved_key {
3577                dc_pool::DcConnection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
3578            } else {
3579                let conn = dc_pool::DcConnection::connect_raw(&addr, socks5.as_ref(), &transport).await?;
3580                // Export auth from home DC and import into worker DC
3581                let home_dc_id = *self.inner.home_dc_id.lock().await;
3582                if dc_id != home_dc_id {
3583                    if let Err(e) = self.export_import_auth(dc_id, &conn).await {
3584                        log::warn!("[layer] Auth export/import for DC{dc_id} failed: {e}");
3585                    }
3586                }
3587                conn
3588            };
3589
3590            let key = dc_conn.auth_key_bytes();
3591            {
3592                let mut opts = self.inner.dc_options.lock().await;
3593                if let Some(e) = opts.get_mut(&dc_id) {
3594                    e.auth_key = Some(key);
3595                }
3596            }
3597            self.inner.dc_pool.lock().await.insert(dc_id, dc_conn);
3598        }
3599
3600        let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
3601        self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, req).await
3602    }
3603
3604    /// Export authorization from the home DC and import it into `dc_id`.
3605    async fn export_import_auth(
3606        &self,
3607        dc_id:   i32,
3608        _dc_conn: &dc_pool::DcConnection, // reserved for future direct import
3609    ) -> Result<(), InvocationError> {
3610        // Export from home DC
3611        let export_req = tl::functions::auth::ExportAuthorization { dc_id };
3612        let body    = self.rpc_call_raw(&export_req).await?;
3613        let mut cur = Cursor::from_slice(&body);
3614        let exported = match tl::enums::auth::ExportedAuthorization::deserialize(&mut cur)? {
3615            tl::enums::auth::ExportedAuthorization::ExportedAuthorization(e) => e,
3616        };
3617
3618        // Import into the target DC via the pool
3619        let import_req = tl::functions::auth::ImportAuthorization {
3620            id:    exported.id,
3621            bytes: exported.bytes,
3622        };
3623        let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
3624        self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, &import_req).await?;
3625        log::info!("[layer] Auth exported+imported to DC{dc_id} ✓");
3626        Ok(())
3627    }
3628
3629    // ── Private helpers ────────────────────────────────────────────────────
3630
3631    async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
3632        let body    = self.rpc_call_raw(&tl::functions::account::GetPassword {}).await?;
3633        let mut cur = Cursor::from_slice(&body);
3634        let pw = match tl::enums::account::Password::deserialize(&mut cur)? {
3635            tl::enums::account::Password::Password(p) => p,
3636        };
3637        Ok(PasswordToken { password: pw })
3638    }
3639
3640    fn make_send_code_req(&self, phone: &str) -> tl::functions::auth::SendCode {
3641        tl::functions::auth::SendCode {
3642            phone_number: phone.to_string(),
3643            api_id:       self.inner.api_id,
3644            api_hash:     self.inner.api_hash.clone(),
3645            settings:     tl::enums::CodeSettings::CodeSettings(
3646                tl::types::CodeSettings {
3647                    allow_flashcall: false, current_number: false, allow_app_hash: false,
3648                    allow_missed_call: false, allow_firebase: false, unknown_number: false,
3649                    logout_tokens: None, token: None, app_sandbox: None,
3650                },
3651            ),
3652        }
3653    }
3654
3655    fn extract_user_name(user: &tl::enums::User) -> String {
3656        match user {
3657            tl::enums::User::User(u) => {
3658                format!("{} {}",
3659                    u.first_name.as_deref().unwrap_or(""),
3660                    u.last_name.as_deref().unwrap_or(""))
3661                    .trim().to_string()
3662            }
3663            tl::enums::User::Empty(_) => "(unknown)".into(),
3664        }
3665    }
3666
3667    fn extract_password_params(
3668        algo: &tl::enums::PasswordKdfAlgo,
3669    ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
3670        match algo {
3671            tl::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
3672                Ok((&a.salt1, &a.salt2, &a.p, a.g))
3673            }
3674            _ => Err(InvocationError::Deserialize("unsupported password KDF algo".into())),
3675        }
3676    }
3677}
3678
3679// ─── Paginated iterators ──────────────────────────────────────────────────────
3680
3681/// Cursor-based iterator over dialogs. Created by [`Client::iter_dialogs`].
3682pub struct DialogIter {
3683    offset_date: i32,
3684    offset_id:   i32,
3685    offset_peer: tl::enums::InputPeer,
3686    done:        bool,
3687    buffer:      VecDeque<Dialog>,
3688    /// Total dialog count as reported by the first server response.
3689    /// `None` until the first page is fetched.
3690    pub total: Option<i32>,
3691}
3692
3693impl DialogIter {
3694    const PAGE_SIZE: i32 = 100;
3695
3696    /// Total number of dialogs as reported by the server on the first page fetch.
3697    ///
3698    /// Returns `None` before the first [`next`](Self::next) call, and `None` for
3699    /// accounts with fewer dialogs than `PAGE_SIZE` (where the server returns
3700    /// `messages.Dialogs` instead of `messages.DialogsSlice`).
3701    pub fn total(&self) -> Option<i32> { self.total }
3702
3703    /// Fetch the next dialog. Returns `None` when all dialogs have been yielded.
3704    pub async fn next(&mut self, client: &Client) -> Result<Option<Dialog>, InvocationError> {
3705        if let Some(d) = self.buffer.pop_front() { return Ok(Some(d)); }
3706        if self.done { return Ok(None); }
3707
3708        let req = tl::functions::messages::GetDialogs {
3709            exclude_pinned: false,
3710            folder_id:      None,
3711            offset_date:    self.offset_date,
3712            offset_id:      self.offset_id,
3713            offset_peer:    self.offset_peer.clone(),
3714            limit:          Self::PAGE_SIZE,
3715            hash:           0,
3716        };
3717
3718        let (dialogs, count) = client.get_dialogs_raw_with_count(req).await?;
3719        // Populate total from the first response (messages.DialogsSlice carries a count).
3720        if self.total.is_none() {
3721            self.total = count;
3722        }
3723        if dialogs.is_empty() || dialogs.len() < Self::PAGE_SIZE as usize {
3724            self.done = true;
3725        }
3726
3727        // Prepare cursor for next page
3728        if let Some(last) = dialogs.last() {
3729            self.offset_date = last.message.as_ref().map(|m| match m {
3730                tl::enums::Message::Message(x) => x.date,
3731                tl::enums::Message::Service(x) => x.date,
3732                _ => 0,
3733            }).unwrap_or(0);
3734            self.offset_id = last.top_message();
3735            if let Some(peer) = last.peer() {
3736                self.offset_peer = client.inner.peer_cache.read().await.peer_to_input(peer);
3737            }
3738        }
3739
3740        self.buffer.extend(dialogs);
3741        Ok(self.buffer.pop_front())
3742    }
3743}
3744
3745/// Cursor-based iterator over message history. Created by [`Client::iter_messages`].
3746pub struct MessageIter {
3747    peer:      tl::enums::Peer,
3748    offset_id: i32,
3749    done:      bool,
3750    buffer:    VecDeque<update::IncomingMessage>,
3751    /// Total message count from the first server response (messages.Slice).
3752    /// `None` until the first page is fetched, `None` for `messages.Messages`
3753    /// (which returns an exact slice with no separate count).
3754    pub total: Option<i32>,
3755}
3756
3757impl MessageIter {
3758    const PAGE_SIZE: i32 = 100;
3759
3760    /// Total message count from the first server response.
3761    ///
3762    /// Returns `None` before the first [`next`](Self::next) call, or for chats
3763    /// where the server returns an exact (non-slice) response.
3764    pub fn total(&self) -> Option<i32> { self.total }
3765
3766    /// Fetch the next message (newest first). Returns `None` when all messages have been yielded.
3767    pub async fn next(&mut self, client: &Client) -> Result<Option<update::IncomingMessage>, InvocationError> {
3768        if let Some(m) = self.buffer.pop_front() { return Ok(Some(m)); }
3769        if self.done { return Ok(None); }
3770
3771        let input_peer = client.inner.peer_cache.read().await.peer_to_input(&self.peer);
3772        let (page, count) = client.get_messages_with_count(input_peer, Self::PAGE_SIZE, self.offset_id).await?;
3773
3774        if self.total.is_none() { self.total = count; }
3775
3776        if page.is_empty() || page.len() < Self::PAGE_SIZE as usize {
3777            self.done = true;
3778        }
3779        if let Some(last) = page.last() {
3780            self.offset_id = last.id();
3781        }
3782
3783        self.buffer.extend(page);
3784        Ok(self.buffer.pop_front())
3785    }
3786}
3787
3788// ─── Public random helper (used by media.rs) ──────────────────────────────────
3789
3790/// Public wrapper for `random_i64` used by sub-modules.
3791#[doc(hidden)]
3792pub fn random_i64_pub() -> i64 { random_i64() }
3793
3794// ─── Connection ───────────────────────────────────────────────────────────────
3795
3796/// How framing bytes are sent/received on a connection.
3797#[derive(Clone)]
3798enum FrameKind {
3799    Abridged,
3800    Intermediate,
3801    #[allow(dead_code)]
3802    Full { send_seqno: u32, recv_seqno: u32 },
3803}
3804
3805
3806// ─── Split connection types ───────────────────────────────────────────────────
3807
3808/// Write half of a split connection.  Held under `Mutex` in `ClientInner`.
3809/// Owns the EncryptedSession (for packing) and the pending-RPC map.
3810struct ConnectionWriter {
3811    write_half: OwnedWriteHalf,
3812    enc:        EncryptedSession,
3813    frame_kind: FrameKind,
3814    /// G-04: msg_ids of received content messages waiting to be acked.
3815    /// Drained into a MsgsAck on every outgoing frame (bundled into container
3816    /// when sending an RPC, or sent standalone after route_frame).
3817    pending_ack: Vec<i64>,
3818    /// G-02: raw TL body bytes of every sent request, keyed by msg_id.
3819    /// On bad_msg_notification the matching body is re-encrypted with a fresh
3820    /// msg_id and re-sent transparently.
3821    sent_bodies: std::collections::HashMap<i64, Vec<u8>>,
3822}
3823
3824impl ConnectionWriter {
3825    fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
3826    fn first_salt(&self)    -> i64         { self.enc.salt }
3827    fn time_offset(&self)   -> i32         { self.enc.time_offset }
3828}
3829
3830struct Connection {
3831    stream:     TcpStream,
3832    enc:        EncryptedSession,
3833    frame_kind: FrameKind,
3834}
3835
3836impl Connection {
3837    /// Open a TCP stream, optionally via SOCKS5, and apply transport init bytes.
3838    async fn open_stream(
3839        addr:      &str,
3840        socks5:    Option<&crate::socks5::Socks5Config>,
3841        transport: &TransportKind,
3842    ) -> Result<(TcpStream, FrameKind), InvocationError> {
3843        let stream = match socks5 {
3844            Some(proxy) => proxy.connect(addr).await?,
3845            None => {
3846                // Let tokio do the TCP handshake properly (await until connected),
3847                // then apply socket2 keepalive options to the live socket.
3848                let stream = TcpStream::connect(addr).await
3849                    .map_err(InvocationError::Io)?;
3850
3851                // TCP_NODELAY: disable Nagle's algorithm so small frames
3852                // (MsgsAck, Ping, etc.) are sent immediately without waiting
3853                // for a previous write to be ACK'd. Without this, a standalone
3854                // MsgsAck followed by the app's Ping RPC causes Nagle to buffer
3855                // the Ping until the ack is received (~1 extra RTT ≈ 80 ms).
3856                stream.set_nodelay(true).ok();
3857
3858                // TCP-level keepalive: OS sends probes independently of our
3859                // application-level pings. Catches cases where the network
3860                // disappears without a TCP RST (e.g. mobile data switching,
3861                // NAT table expiry) faster than a 15 s application ping would.
3862                // We use socket2 only for the setsockopt call, not for connect.
3863                {
3864                    let sock = socket2::SockRef::from(&stream);
3865                    let keepalive = TcpKeepalive::new()
3866                        .with_time(Duration::from_secs(TCP_KEEPALIVE_IDLE_SECS))
3867                        .with_interval(Duration::from_secs(TCP_KEEPALIVE_INTERVAL_SECS));
3868                    #[cfg(not(target_os = "windows"))]
3869                    let keepalive = keepalive.with_retries(TCP_KEEPALIVE_PROBES);
3870                    sock.set_tcp_keepalive(&keepalive).ok();
3871                }
3872                stream
3873            }
3874        };
3875        Self::apply_transport_init(stream, transport).await
3876    }
3877
3878    /// Send the transport init bytes and return the stream + FrameKind.
3879    async fn apply_transport_init(
3880        mut stream: TcpStream,
3881        transport:  &TransportKind,
3882    ) -> Result<(TcpStream, FrameKind), InvocationError> {
3883        match transport {
3884            TransportKind::Abridged => {
3885                stream.write_all(&[0xef]).await?;
3886                Ok((stream, FrameKind::Abridged))
3887            }
3888            TransportKind::Intermediate => {
3889                stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
3890                Ok((stream, FrameKind::Intermediate))
3891            }
3892            TransportKind::Full => {
3893                // Full transport has no init byte
3894                Ok((stream, FrameKind::Full { send_seqno: 0, recv_seqno: 0 }))
3895            }
3896            TransportKind::Obfuscated { secret } => {
3897                // For obfuscated we do the full handshake inside open_obfuscated,
3898                // then wrap back in a plain TcpStream via into_inner.
3899                // Since ObfuscatedStream is a different type we reuse the Abridged
3900                // frame logic internally — the encryption layer handles everything.
3901                //
3902                // Implementation note: We convert to Abridged after the handshake
3903                // because ObfuscatedStream internally already uses Abridged framing
3904                // with XOR applied on top.  The outer Connection just sends raw bytes.
3905                let mut nonce = [0u8; 64];
3906                getrandom::getrandom(&mut nonce).map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
3907                // Write obfuscated handshake header
3908                let (enc_key, enc_iv, _dec_key, _dec_iv) = crate::transport_obfuscated::derive_keys(&nonce, secret.as_ref());
3909                let mut enc_cipher = crate::transport_obfuscated::ObfCipher::new(enc_key, enc_iv);
3910                // Stamp protocol tag into nonce[56..60]
3911                let mut handshake = nonce;
3912                handshake[56] = 0xef; handshake[57] = 0xef;
3913                handshake[58] = 0xef; handshake[59] = 0xef;
3914                enc_cipher.apply(&mut handshake[56..]);
3915                stream.write_all(&handshake).await?;
3916                Ok((stream, FrameKind::Abridged))
3917            }
3918        }
3919    }
3920
3921    async fn connect_raw(
3922        addr:      &str,
3923        socks5:    Option<&crate::socks5::Socks5Config>,
3924        transport: &TransportKind,
3925    ) -> Result<Self, InvocationError> {
3926        log::info!("[layer] Connecting to {addr} (DH) …");
3927
3928        // Wrap the entire DH handshake in a timeout so a silent server
3929        // response (e.g. a mis-framed transport error) never causes an
3930        // infinite hang.
3931        let addr2      = addr.to_string();
3932        let socks5_c   = socks5.cloned();
3933        let transport_c = transport.clone();
3934
3935        let fut = async move {
3936            let (mut stream, frame_kind) =
3937                Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
3938
3939            let mut plain = Session::new();
3940
3941            let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3942            send_frame(&mut stream, &plain.pack(&req1).to_plaintext_bytes(), &frame_kind).await?;
3943            let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
3944
3945            let (req2, s2) = auth::step2(s1, res_pq).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3946            send_frame(&mut stream, &plain.pack(&req2).to_plaintext_bytes(), &frame_kind).await?;
3947            let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
3948
3949            let (req3, s3) = auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3950            send_frame(&mut stream, &plain.pack(&req3).to_plaintext_bytes(), &frame_kind).await?;
3951            let ans: tl::enums::SetClientDhParamsAnswer = recv_frame_plain(&mut stream, &frame_kind).await?;
3952
3953            let done = auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3954            log::info!("[layer] DH complete ✓");
3955
3956            Ok::<Self, InvocationError>(Self {
3957                stream,
3958                enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
3959                frame_kind,
3960            })
3961        };
3962
3963        tokio::time::timeout(Duration::from_secs(15), fut)
3964            .await
3965            .map_err(|_| InvocationError::Deserialize(
3966                format!("DH handshake with {addr} timed out after 15 s")
3967            ))?
3968    }
3969
3970    async fn connect_with_key(
3971        addr:        &str,
3972        auth_key:    [u8; 256],
3973        first_salt:  i64,
3974        time_offset: i32,
3975        socks5:      Option<&crate::socks5::Socks5Config>,
3976        transport:   &TransportKind,
3977    ) -> Result<Self, InvocationError> {
3978        let addr2       = addr.to_string();
3979        let socks5_c    = socks5.cloned();
3980        let transport_c = transport.clone();
3981
3982        let fut = async move {
3983            let (stream, frame_kind) =
3984                Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
3985            Ok::<Self, InvocationError>(Self {
3986                stream,
3987                enc: EncryptedSession::new(auth_key, first_salt, time_offset),
3988                frame_kind,
3989            })
3990        };
3991
3992        tokio::time::timeout(Duration::from_secs(15), fut)
3993            .await
3994            .map_err(|_| InvocationError::Deserialize(
3995                format!("connect_with_key to {addr} timed out after 15 s")
3996            ))?
3997    }
3998
3999    fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
4000
4001    /// Split into a write-only `ConnectionWriter` and the TCP read half.
4002    fn into_writer(self) -> (ConnectionWriter, OwnedReadHalf, FrameKind) {
4003        let (read_half, write_half) = self.stream.into_split();
4004        let writer = ConnectionWriter {
4005            write_half,
4006            enc:        self.enc,
4007            frame_kind: self.frame_kind.clone(),
4008            pending_ack: Vec::new(),
4009            sent_bodies: std::collections::HashMap::new(),
4010        };
4011        (writer, read_half, self.frame_kind)
4012    }
4013}
4014
4015// ─── Transport framing (multi-kind) ──────────────────────────────────────────
4016
4017/// Send a framed message using the active transport kind.
4018async fn send_frame(
4019    stream: &mut TcpStream,
4020    data:   &[u8],
4021    kind:   &FrameKind,
4022) -> Result<(), InvocationError> {
4023    match kind {
4024        FrameKind::Abridged => send_abridged(stream, data).await,
4025        FrameKind::Intermediate | FrameKind::Full { .. } => {
4026            // Single combined write (fix #1).
4027            let mut frame = Vec::with_capacity(4 + data.len());
4028            frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
4029            frame.extend_from_slice(data);
4030            stream.write_all(&frame).await?;
4031            Ok(())
4032        }
4033    }
4034}
4035
4036// ─── Split-reader helpers ─────────────────────────────────────────────────────
4037
4038/// Outcome of a timed frame read attempt.
4039enum FrameOutcome {
4040    Frame(Vec<u8>),
4041    Error(InvocationError),
4042    Keepalive,  // timeout elapsed but ping was sent; caller should loop
4043}
4044
4045/// Read one frame with a 60-second keepalive timeout (PING_DELAY_SECS).
4046///
4047/// If the timeout fires we send a `PingDelayDisconnect` — this tells Telegram
4048/// to forcibly close the connection after `NO_PING_DISCONNECT` seconds of
4049/// silence, giving us a clean EOF to detect rather than a silently stale socket.
4050/// That mirrors what both grammers and the official Telegram clients do.
4051async fn recv_frame_with_keepalive(
4052    rh:      &mut OwnedReadHalf,
4053    fk:      &FrameKind,
4054    client:  &Client,
4055    _ak:     &[u8; 256],
4056) -> FrameOutcome {
4057    match tokio::time::timeout(Duration::from_secs(PING_DELAY_SECS), recv_frame_read(rh, fk)).await {
4058        Ok(Ok(raw)) => FrameOutcome::Frame(raw),
4059        Ok(Err(e))  => FrameOutcome::Error(e),
4060        Err(_) => {
4061            // Keepalive timeout: send PingDelayDisconnect so Telegram closes the
4062            // connection cleanly (EOF) if it hears nothing for NO_PING_DISCONNECT
4063            // seconds, rather than leaving a silently stale socket.
4064            let ping_req = tl::functions::PingDelayDisconnect {
4065                ping_id:          random_i64(),
4066                disconnect_delay: NO_PING_DISCONNECT,
4067            };
4068            let mut w = client.inner.writer.lock().await;
4069            let wire = w.enc.pack(&ping_req);
4070            let fk = w.frame_kind.clone();
4071            match send_frame_write(&mut w.write_half, &wire, &fk).await {
4072                Ok(()) => FrameOutcome::Keepalive,
4073                // If the write itself fails the connection is already dead.
4074                // Return Error so the reader immediately enters the reconnect loop
4075                // instead of sitting silent for another PING_DELAY_SECS.
4076                Err(e) => FrameOutcome::Error(e),
4077            }
4078        }
4079    }
4080}
4081
4082/// Send a framed message via an OwnedWriteHalf (split connection).
4083///
4084/// Fix #1: Header and payload are combined into a single Vec before calling
4085/// write_all, reducing write syscalls from 2 → 1 per frame.  With Abridged
4086/// framing this previously sent a 1-byte header then the payload in separate
4087/// syscalls (and two TCP segments even with TCP_NODELAY on fast paths).
4088async fn send_frame_write(
4089    stream: &mut OwnedWriteHalf,
4090    data:   &[u8],
4091    kind:   &FrameKind,
4092) -> Result<(), InvocationError> {
4093    match kind {
4094        FrameKind::Abridged => {
4095            let words = data.len() / 4;
4096            // Build header + payload in one allocation → single syscall.
4097            let mut frame = if words < 0x7f {
4098                let mut v = Vec::with_capacity(1 + data.len());
4099                v.push(words as u8);
4100                v
4101            } else {
4102                let mut v = Vec::with_capacity(4 + data.len());
4103                v.extend_from_slice(&[
4104                    0x7f,
4105                    (words & 0xff) as u8,
4106                    ((words >> 8) & 0xff) as u8,
4107                    ((words >> 16) & 0xff) as u8,
4108                ]);
4109                v
4110            };
4111            frame.extend_from_slice(data);
4112            stream.write_all(&frame).await?;
4113            Ok(())
4114        }
4115        FrameKind::Intermediate | FrameKind::Full { .. } => {
4116            let mut frame = Vec::with_capacity(4 + data.len());
4117            frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
4118            frame.extend_from_slice(data);
4119            stream.write_all(&frame).await?;
4120            Ok(())
4121        }
4122    }
4123}
4124
4125/// Receive a framed message via an OwnedReadHalf (split connection).
4126async fn recv_frame_read(
4127    stream: &mut OwnedReadHalf,
4128    kind:   &FrameKind,
4129) -> Result<Vec<u8>, InvocationError> {
4130    match kind {
4131        FrameKind::Abridged => {
4132            let mut h = [0u8; 1];
4133            stream.read_exact(&mut h).await?;
4134            let words = if h[0] < 0x7f {
4135                h[0] as usize
4136            } else {
4137                let mut b = [0u8; 3];
4138                stream.read_exact(&mut b).await?;
4139                b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
4140            };
4141            let len = words * 4;
4142            let mut buf = vec![0u8; len];
4143            stream.read_exact(&mut buf).await?;
4144            Ok(buf)
4145        }
4146        FrameKind::Intermediate | FrameKind::Full { .. } => {
4147            let mut len_buf = [0u8; 4];
4148            stream.read_exact(&mut len_buf).await?;
4149            let len = u32::from_le_bytes(len_buf) as usize;
4150            let mut buf = vec![0u8; len];
4151            stream.read_exact(&mut buf).await?;
4152            Ok(buf)
4153        }
4154    }
4155}
4156
4157
4158/// Send using Abridged framing (used for DH plaintext during connect).
4159async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
4160    let words = data.len() / 4;
4161    // Single combined write (header + payload) — same fix #1 as send_frame_write.
4162    let mut frame = if words < 0x7f {
4163        let mut v = Vec::with_capacity(1 + data.len());
4164        v.push(words as u8);
4165        v
4166    } else {
4167        let mut v = Vec::with_capacity(4 + data.len());
4168        v.extend_from_slice(&[
4169            0x7f,
4170            (words & 0xff) as u8,
4171            ((words >> 8) & 0xff) as u8,
4172            ((words >> 16) & 0xff) as u8,
4173        ]);
4174        v
4175    };
4176    frame.extend_from_slice(data);
4177    stream.write_all(&frame).await?;
4178    Ok(())
4179}
4180
4181async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
4182    let mut h = [0u8; 1];
4183    stream.read_exact(&mut h).await?;
4184    let words = if h[0] < 0x7f {
4185        h[0] as usize
4186    } else {
4187        let mut b = [0u8; 3];
4188        stream.read_exact(&mut b).await?;
4189        let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
4190        // word count of 1 after 0xFF = Telegram 4-byte transport error code
4191        if w == 1 {
4192            let mut code_buf = [0u8; 4];
4193            stream.read_exact(&mut code_buf).await?;
4194            let code = i32::from_le_bytes(code_buf);
4195            return Err(InvocationError::Rpc(RpcError::from_telegram(code, "transport error")));
4196        }
4197        w
4198    };
4199    // Guard against implausibly large reads — a raw 4-byte transport error
4200    // whose first byte was mis-read as a word count causes a hang otherwise.
4201    if words == 0 || words > 0x8000 {
4202        return Err(InvocationError::Deserialize(
4203            format!("abridged: implausible word count {words} (possible transport error or framing mismatch)")
4204        ));
4205    }
4206    let mut buf = vec![0u8; words * 4];
4207    stream.read_exact(&mut buf).await?;
4208    Ok(buf)
4209}
4210
4211/// Receive a plaintext (pre-auth) frame and deserialize it.
4212async fn recv_frame_plain<T: Deserializable>(
4213    stream: &mut TcpStream,
4214    _kind:  &FrameKind,
4215) -> Result<T, InvocationError> {
4216    let raw = recv_abridged(stream).await?; // DH always uses abridged for plaintext
4217    if raw.len() < 20 {
4218        return Err(InvocationError::Deserialize("plaintext frame too short".into()));
4219    }
4220    if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
4221        return Err(InvocationError::Deserialize("expected auth_key_id=0 in plaintext".into()));
4222    }
4223    let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
4224    let mut cur  = Cursor::from_slice(&raw[20..20 + body_len]);
4225    T::deserialize(&mut cur).map_err(Into::into)
4226}
4227
4228// ─── MTProto envelope ─────────────────────────────────────────────────────────
4229
4230enum EnvelopeResult {
4231    Payload(Vec<u8>),
4232    Updates(Vec<update::Update>),
4233    None,
4234}
4235
4236fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
4237    if body.len() < 4 {
4238        return Err(InvocationError::Deserialize("body < 4 bytes".into()));
4239    }
4240    let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
4241
4242    match cid {
4243        ID_RPC_RESULT => {
4244            if body.len() < 12 {
4245                return Err(InvocationError::Deserialize("rpc_result too short".into()));
4246            }
4247            unwrap_envelope(body[12..].to_vec())
4248        }
4249        ID_RPC_ERROR => {
4250            if body.len() < 8 {
4251                return Err(InvocationError::Deserialize("rpc_error too short".into()));
4252            }
4253            let code    = i32::from_le_bytes(body[4..8].try_into().unwrap());
4254            let message = tl_read_string(&body[8..]).unwrap_or_default();
4255            Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
4256        }
4257        ID_MSG_CONTAINER => {
4258            if body.len() < 8 {
4259                return Err(InvocationError::Deserialize("container too short".into()));
4260            }
4261            let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
4262            let mut pos = 8usize;
4263            let mut payload: Option<Vec<u8>> = None;
4264            let mut updates_buf: Vec<update::Update> = Vec::new();
4265
4266            for _ in 0..count {
4267                if pos + 16 > body.len() { break; }
4268                let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
4269                pos += 16;
4270                if pos + inner_len > body.len() { break; }
4271                let inner = body[pos..pos + inner_len].to_vec();
4272                pos += inner_len;
4273                match unwrap_envelope(inner)? {
4274                    EnvelopeResult::Payload(p)  => { payload = Some(p); }
4275                    EnvelopeResult::Updates(us) => { updates_buf.extend(us); }
4276                    EnvelopeResult::None        => {}
4277                }
4278            }
4279            if let Some(p) = payload {
4280                Ok(EnvelopeResult::Payload(p))
4281            } else if !updates_buf.is_empty() {
4282                Ok(EnvelopeResult::Updates(updates_buf))
4283            } else {
4284                Ok(EnvelopeResult::None)
4285            }
4286        }
4287        ID_GZIP_PACKED => {
4288            let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
4289            unwrap_envelope(gz_inflate(&bytes)?)
4290        }
4291        // MTProto service messages — silently acknowledged, no payload extracted.
4292        // NOTE: ID_PONG is intentionally NOT listed here. Pong arrives as a bare
4293        // top-level frame (never inside rpc_result), so it is handled in route_frame
4294        // directly. Silencing it here would drop it before invoke() can resolve it.
4295        ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY
4296        // These are correctly silenced (grammers silences these too)
4297        | 0xd33b5459  // MsgsStateReq
4298        | 0x04deb57d  // MsgsStateInfo
4299        | 0x8cc0d131  // MsgsAllInfo
4300        | 0x276d3ec6  // MsgDetailedInfo
4301        | 0x809db6df  // MsgNewDetailedInfo
4302        | 0x7d861a08  // MsgResendReq / MsgResendAnsReq
4303        | 0x0949d9dc  // FutureSalt
4304        | 0xae500895  // FutureSalts
4305        | 0x9299359f  // HttpWait
4306        | 0xe22045fc  // DestroySessionOk
4307        | 0x62d350c9  // DestroySessionNone
4308        => {
4309            Ok(EnvelopeResult::None)
4310        }
4311        ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
4312        | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
4313        | ID_UPDATES_TOO_LONG => {
4314            Ok(EnvelopeResult::Updates(update::parse_updates(&body)))
4315        }
4316        _ => Ok(EnvelopeResult::Payload(body)),
4317    }
4318}
4319
4320// ─── Utilities ────────────────────────────────────────────────────────────────
4321
4322fn random_i64() -> i64 {
4323    let mut b = [0u8; 8];
4324    getrandom::getrandom(&mut b).expect("getrandom");
4325    i64::from_le_bytes(b)
4326}
4327
4328/// Apply ±20 % random jitter to a backoff delay.
4329/// Prevents thundering-herd when many clients reconnect simultaneously
4330/// (e.g. after a server restart or a shared network outage).
4331fn jitter_delay(base_ms: u64) -> Duration {
4332    // Use two random bytes for the jitter factor (0..=65535 → 0.80 … 1.20).
4333    let mut b = [0u8; 2];
4334    getrandom::getrandom(&mut b).unwrap_or(());
4335    let rand_frac = u16::from_le_bytes(b) as f64 / 65535.0; // 0.0 … 1.0
4336    let factor    = 0.80 + rand_frac * 0.40;                 // 0.80 … 1.20
4337    Duration::from_millis((base_ms as f64 * factor) as u64)
4338}
4339
4340fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
4341    if data.is_empty() { return Some(vec![]); }
4342    let (len, start) = if data[0] < 254 { (data[0] as usize, 1) }
4343    else if data.len() >= 4 {
4344        (data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16, 4)
4345    } else { return None; };
4346    if data.len() < start + len { return None; }
4347    Some(data[start..start + len].to_vec())
4348}
4349
4350fn tl_read_string(data: &[u8]) -> Option<String> {
4351    tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
4352}
4353
4354fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
4355    use std::io::Read;
4356    let mut out = Vec::new();
4357    if flate2::read::GzDecoder::new(data).read_to_end(&mut out).is_ok() && !out.is_empty() {
4358        return Ok(out);
4359    }
4360    out.clear();
4361    flate2::read::ZlibDecoder::new(data)
4362        .read_to_end(&mut out)
4363        .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
4364    Ok(out)
4365}
4366
4367// ── G-06: outgoing gzip compression ──────────────────────────────────────────
4368
4369/// Minimum body size above which we attempt zlib compression.
4370/// Below this threshold the gzip_packed wrapper overhead exceeds the gain.
4371const COMPRESSION_THRESHOLD: usize = 512;
4372
4373/// TL `bytes` wire encoding (used inside gzip_packed).
4374fn tl_write_bytes(data: &[u8]) -> Vec<u8> {
4375    let len = data.len();
4376    let mut out = Vec::with_capacity(4 + len);
4377    if len < 254 {
4378        out.push(len as u8);
4379        out.extend_from_slice(data);
4380        let pad = (4 - (1 + len) % 4) % 4;
4381        out.extend(std::iter::repeat(0u8).take(pad));
4382    } else {
4383        out.push(0xfe);
4384        out.extend_from_slice(&(len as u32).to_le_bytes()[..3]);
4385        out.extend_from_slice(data);
4386        let pad = (4 - (4 + len) % 4) % 4;
4387        out.extend(std::iter::repeat(0u8).take(pad));
4388    }
4389    out
4390}
4391
4392/// Wrap `data` in a `gzip_packed#3072cfa1 packed_data:bytes` TL frame.
4393fn gz_pack_body(data: &[u8]) -> Vec<u8> {
4394    use std::io::Write;
4395    let mut enc = flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default());
4396    let _ = enc.write_all(data);
4397    let compressed = enc.finish().unwrap_or_default();
4398    let mut out = Vec::with_capacity(4 + 4 + compressed.len());
4399    out.extend_from_slice(&ID_GZIP_PACKED.to_le_bytes());
4400    out.extend(tl_write_bytes(&compressed));
4401    out
4402}
4403
4404/// Optionally compress `data`.  Returns the compressed `gzip_packed` wrapper
4405/// if it is shorter than the original; otherwise returns `data` unchanged.
4406fn maybe_gz_pack(data: &[u8]) -> Vec<u8> {
4407    if data.len() <= COMPRESSION_THRESHOLD {
4408        return data.to_vec();
4409    }
4410    let packed = gz_pack_body(data);
4411    if packed.len() < data.len() { packed } else { data.to_vec() }
4412}
4413
4414// ── G-04 + G-07: MsgsAck body builder ───────────────────────────────────────
4415
4416/// Build the TL body for `msgs_ack#62d6b459 msg_ids:Vector<long>`.
4417fn build_msgs_ack_body(msg_ids: &[i64]) -> Vec<u8> {
4418    // msgs_ack#62d6b459 msg_ids:Vector<long>
4419    // Vector<long>: 0x1cb5c415 + count:int + [i64...]
4420    let mut out = Vec::with_capacity(4 + 4 + 4 + msg_ids.len() * 8);
4421    out.extend_from_slice(&ID_MSGS_ACK.to_le_bytes());
4422    out.extend_from_slice(&0x1cb5c415_u32.to_le_bytes()); // Vector constructor
4423    out.extend_from_slice(&(msg_ids.len() as u32).to_le_bytes());
4424    for &id in msg_ids {
4425        out.extend_from_slice(&id.to_le_bytes());
4426    }
4427    out
4428}
4429
4430// ── G-07: MessageContainer body builder ──────────────────────────────────────
4431
4432/// Build the body of a `msg_container#73f1f8dc` from a list of
4433/// `(msg_id, seqno, body)` inner messages.
4434///
4435/// The caller is responsible for allocating msg_id and seqno for each entry
4436/// via `EncryptedSession::alloc_msg_seqno`.
4437fn build_container_body(messages: &[(i64, i32, &[u8])]) -> Vec<u8> {
4438    let total_body: usize = messages.iter().map(|(_, _, b)| 16 + b.len()).sum();
4439    let mut out = Vec::with_capacity(8 + total_body);
4440    out.extend_from_slice(&ID_MSG_CONTAINER.to_le_bytes());
4441    out.extend_from_slice(&(messages.len() as u32).to_le_bytes());
4442    for &(msg_id, seqno, body) in messages {
4443        out.extend_from_slice(&msg_id.to_le_bytes());
4444        out.extend_from_slice(&seqno.to_le_bytes());
4445        out.extend_from_slice(&(body.len() as u32).to_le_bytes());
4446        out.extend_from_slice(body);
4447    }
4448    out
4449}