Skip to main content

layer_client/
lib.rs

1#![doc(html_root_url = "https://docs.rs/layer-client/0.4.0")]
2//! # layer-client
3//!
4//! Production-grade async Telegram client built on MTProto.
5//!
6//! ## Features
7//! - User login (phone + code + 2FA SRP) and bot token login
8//! - Peer access-hash caching — API calls always carry correct access hashes
9//! - `FLOOD_WAIT` auto-retry with configurable policy
10//! - Typed async update stream: `NewMessage`, `MessageEdited`, `MessageDeleted`,
11//!   `CallbackQuery`, `InlineQuery`, `InlineSend`, `Raw`
12//! - Send / edit / delete / forward / pin messages
13//! - Search messages (per-chat and global)
14//! - Mark as read, delete dialogs, clear mentions
15//! - Join chat / accept invite links
16//! - Chat action (typing, uploading, …)
17//! - `get_me()` — fetch own User info
18//! - Paginated dialog and message iterators
19//! - DC migration, session persistence, reconnect
20
21#![deny(unsafe_code)]
22
23mod errors;
24mod retry;
25mod session;
26mod transport;
27mod two_factor_auth;
28pub mod update;
29pub mod parsers;
30pub mod media;
31pub mod participants;
32pub mod pts;
33
34// ── New feature modules ───────────────────────────────────────────────────────
35pub mod dc_pool;
36pub mod transport_obfuscated;
37pub mod transport_intermediate;
38pub mod socks5;
39pub mod session_backend;
40pub mod inline_iter;
41pub mod typing_guard;
42pub mod keyboard;
43pub mod search;
44pub mod types;
45
46#[macro_use]
47pub mod macros;
48
49pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
50pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
51pub use update::Update;
52pub use media::{UploadedFile, DownloadIter, Photo, Document, Sticker, Downloadable};
53pub use participants::Participant;
54pub use typing_guard::TypingGuard;
55pub use socks5::Socks5Config;
56pub use session_backend::{SessionBackend, BinaryFileBackend, InMemoryBackend};
57pub use keyboard::{Button, InlineKeyboard, ReplyKeyboard};
58pub use search::{SearchBuilder, GlobalSearchBuilder};
59pub use types::{User, Group, Channel, Chat};
60
61/// Re-export of `layer_tl_types` — generated TL constructors, functions, and enums.
62/// Users can write `use layer_client::tl` instead of adding a separate `layer-tl-types` dep.
63pub use layer_tl_types as tl;
64
65use std::collections::HashMap;
66use std::collections::VecDeque;
67use std::num::NonZeroU32;
68use std::ops::ControlFlow;
69use std::sync::Arc;
70use std::time::Duration;
71
72use layer_mtproto::{EncryptedSession, Session, authentication as auth};
73use layer_tl_types::{Cursor, Deserializable, RemoteCall};
74use session::{DcEntry, PersistedSession};
75use tokio::io::{AsyncReadExt, AsyncWriteExt};
76use tokio::net::TcpStream;
77use tokio::sync::{mpsc, oneshot, Mutex};
78use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
79use tokio::time::sleep;
80use tokio_util::sync::CancellationToken;
81use socket2::TcpKeepalive;
82
83// ─── MTProto envelope constructor IDs ────────────────────────────────────────
84
85const ID_RPC_RESULT:       u32 = 0xf35c6d01;
86const ID_RPC_ERROR:        u32 = 0x2144ca19;
87const ID_MSG_CONTAINER:    u32 = 0x73f1f8dc;
88const ID_GZIP_PACKED:      u32 = 0x3072cfa1;
89const ID_PONG:             u32 = 0x347773c5;
90const ID_MSGS_ACK:         u32 = 0x62d6b459;
91const ID_BAD_SERVER_SALT:  u32 = 0xedab447b;
92const ID_NEW_SESSION:      u32 = 0x9ec20908;
93const ID_BAD_MSG_NOTIFY:      u32 = 0xa7eff811;
94// G-09: FutureSalts arrives as a bare frame (not inside rpc_result)
95const ID_FUTURE_SALTS:        u32 = 0xae500895;
96// G-11: server confirms our message was received; we must ack its answer_msg_id
97const ID_MSG_DETAILED_INFO:   u32 = 0x276d3ec6;
98const ID_MSG_NEW_DETAIL_INFO: u32 = 0x809db6df;
99// G-14: server asks us to re-send a specific message
100const ID_MSG_RESEND_REQ:      u32 = 0x7d861a08;
101const ID_UPDATES:             u32 = 0x74ae4240;
102const ID_UPDATE_SHORT:     u32 = 0x78d4dec1;
103const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
104const ID_UPDATE_SHORT_MSG:      u32 = 0x313bc7f8;
105const ID_UPDATE_SHORT_CHAT_MSG: u32 = 0x4d6deea5;
106const ID_UPDATES_TOO_LONG:      u32 = 0xe317af7e;
107
108// ─── Keepalive / reconnect tuning ─────────────────────────────────────────────
109
110/// How often to send a keepalive ping.
111/// Telegram Desktop uses ~15 s on mobile connections.  60 s is too slow to
112/// detect a dead connection before the user notices.
113const PING_DELAY_SECS: u64 = 15;
114
115/// Tell Telegram to close the connection if it hears nothing for this many
116/// seconds.  Must be > PING_DELAY_SECS so a single missed ping doesn't drop us.
117const NO_PING_DISCONNECT: i32 = 20;
118
119/// Initial backoff before the first reconnect attempt.
120const RECONNECT_BASE_MS: u64 = 500;
121
122/// Maximum backoff between reconnect attempts.
123/// 5 s cap instead of 30 s — on mobile, network outages are brief and a 30 s
124/// sleep means the bot stays dead for up to 30 s after the network returns.
125/// Official Telegram mobile clients use a short cap for the same reason.
126const RECONNECT_MAX_SECS: u64 = 5;
127
128/// TCP socket-level keepalive: start probes after this many seconds of idle.
129const TCP_KEEPALIVE_IDLE_SECS: u64 = 10;
130/// Interval between TCP keepalive probes.
131const TCP_KEEPALIVE_INTERVAL_SECS: u64 = 5;
132/// Number of failed probes before the OS declares the connection dead.
133const TCP_KEEPALIVE_PROBES: u32 = 3;
134
135// ─── PeerCache ────────────────────────────────────────────────────────────────
136
137/// Caches access hashes for users and channels so every API call carries the
138/// correct hash without re-resolving peers.
139///
140/// All fields are `pub` so that `save_session` / `connect` can read/write them
141/// directly, and so that advanced callers can inspect the cache.
142#[derive(Default)]
143pub struct PeerCache {
144    /// user_id → access_hash
145    pub users:    HashMap<i64, i64>,
146    /// channel_id → access_hash
147    pub channels: HashMap<i64, i64>,
148}
149
150impl PeerCache {
151    fn cache_user(&mut self, user: &tl::enums::User) {
152        if let tl::enums::User::User(u) = user
153            && let Some(hash) = u.access_hash {
154            self.users.insert(u.id, hash);
155        }
156    }
157
158    fn cache_chat(&mut self, chat: &tl::enums::Chat) {
159        match chat {
160            tl::enums::Chat::Channel(c) => {
161                if let Some(hash) = c.access_hash {
162                    self.channels.insert(c.id, hash);
163                }
164            }
165            tl::enums::Chat::ChannelForbidden(c) => {
166                self.channels.insert(c.id, c.access_hash);
167            }
168            _ => {}
169        }
170    }
171
172    fn cache_users(&mut self, users: &[tl::enums::User]) {
173        for u in users { self.cache_user(u); }
174    }
175
176    fn cache_chats(&mut self, chats: &[tl::enums::Chat]) {
177        for c in chats { self.cache_chat(c); }
178    }
179
180    fn user_input_peer(&self, user_id: i64) -> tl::enums::InputPeer {
181        if user_id == 0 {
182            return tl::enums::InputPeer::PeerSelf;
183        }
184        let hash = self.users.get(&user_id).copied().unwrap_or(0);
185        tl::enums::InputPeer::User(tl::types::InputPeerUser { user_id, access_hash: hash })
186    }
187
188    fn channel_input_peer(&self, channel_id: i64) -> tl::enums::InputPeer {
189        let hash = self.channels.get(&channel_id).copied().unwrap_or(0);
190        tl::enums::InputPeer::Channel(tl::types::InputPeerChannel { channel_id, access_hash: hash })
191    }
192
193    fn peer_to_input(&self, peer: &tl::enums::Peer) -> tl::enums::InputPeer {
194        match peer {
195            tl::enums::Peer::User(u) => self.user_input_peer(u.user_id),
196            tl::enums::Peer::Chat(c) => tl::enums::InputPeer::Chat(
197                tl::types::InputPeerChat { chat_id: c.chat_id }
198            ),
199            tl::enums::Peer::Channel(c) => self.channel_input_peer(c.channel_id),
200        }
201    }
202}
203
204// ─── InputMessage builder ─────────────────────────────────────────────────────
205
206/// Builder for composing outgoing messages.
207///
208/// ```rust,no_run
209/// use layer_client::InputMessage;
210///
211/// let msg = InputMessage::text("Hello, *world*!")
212///     .silent(true)
213///     .reply_to(Some(42));
214/// ```
215#[derive(Clone, Default)]
216pub struct InputMessage {
217    pub text:         String,
218    pub reply_to:     Option<i32>,
219    pub silent:       bool,
220    pub background:   bool,
221    pub clear_draft:  bool,
222    pub no_webpage:   bool,
223    /// Show media above the caption instead of below (Telegram ≥ 10.3).\
224    pub invert_media: bool,
225    /// Schedule to send when the user goes online (`schedule_date = 0x7FFFFFFE`).\
226    pub schedule_once_online: bool,
227    pub entities:     Option<Vec<tl::enums::MessageEntity>>,
228    pub reply_markup: Option<tl::enums::ReplyMarkup>,
229    pub schedule_date: Option<i32>,
230    /// Attached media to send alongside the message (G-45).
231    /// Use [`InputMessage::copy_media`] to attach media copied from an existing message.
232    pub media: Option<tl::enums::InputMedia>,
233}
234
235impl InputMessage {
236    /// Create a message with the given text.
237    pub fn text(text: impl Into<String>) -> Self {
238        Self { text: text.into(), ..Default::default() }
239    }
240
241    /// Set the message text.
242    pub fn set_text(mut self, text: impl Into<String>) -> Self {
243        self.text = text.into(); self
244    }
245
246    /// Reply to a specific message ID.
247    pub fn reply_to(mut self, id: Option<i32>) -> Self {
248        self.reply_to = id; self
249    }
250
251    /// Send silently (no notification sound).
252    pub fn silent(mut self, v: bool) -> Self {
253        self.silent = v; self
254    }
255
256    /// Send in background.
257    pub fn background(mut self, v: bool) -> Self {
258        self.background = v; self
259    }
260
261    /// Clear the draft after sending.
262    pub fn clear_draft(mut self, v: bool) -> Self {
263        self.clear_draft = v; self
264    }
265
266    /// Disable link preview.
267    pub fn no_webpage(mut self, v: bool) -> Self {
268        self.no_webpage = v; self
269    }
270
271    /// Show media above the caption rather than below (requires Telegram ≥ 10.3).
272    pub fn invert_media(mut self, v: bool) -> Self {
273        self.invert_media = v; self
274    }
275
276    /// Schedule the message to be sent when the recipient comes online.
277    ///
278    /// Mutually exclusive with `schedule_date` — calling this last wins.
279    /// Uses the Telegram magic value `0x7FFFFFFE`.
280    pub fn schedule_once_online(mut self) -> Self {
281        self.schedule_once_online = true;
282        self.schedule_date = None;
283        self
284    }
285
286    /// Attach formatting entities (bold, italic, code, links, etc).
287    pub fn entities(mut self, e: Vec<tl::enums::MessageEntity>) -> Self {
288        self.entities = Some(e); self
289    }
290
291    /// Attach a reply markup (inline or reply keyboard).
292    pub fn reply_markup(mut self, rm: tl::enums::ReplyMarkup) -> Self {
293        self.reply_markup = Some(rm); self
294    }
295
296    /// Shorthand for attaching an [`crate::keyboard::InlineKeyboard`].
297    ///
298    /// ```rust,no_run
299    /// use layer_client::{InputMessage, keyboard::{InlineKeyboard, Button}};
300    ///
301    /// let msg = InputMessage::text("Pick one:")
302    ///     .keyboard(InlineKeyboard::new()
303    ///         .row([Button::callback("A", b"a"), Button::callback("B", b"b")]));
304    /// ```
305    pub fn keyboard(mut self, kb: impl Into<tl::enums::ReplyMarkup>) -> Self {
306        self.reply_markup = Some(kb.into()); self
307    }
308
309    /// Schedule the message for a future Unix timestamp.
310    pub fn schedule_date(mut self, ts: Option<i32>) -> Self {
311        self.schedule_date = ts; self
312    }
313
314    /// Attach media copied from an existing message (G-45).
315    ///
316    /// Pass the `InputMedia` obtained from [`crate::media::Photo`],
317    /// [`crate::media::Document`], or directly from a raw `MessageMedia`.
318    ///
319    /// When a `media` is set, the message is sent via `messages.SendMedia`
320    /// instead of `messages.SendMessage`.
321    ///
322    /// ```rust,no_run
323    /// # use layer_client::{InputMessage, tl};
324    /// # fn example(media: tl::enums::InputMedia) {
325    /// let msg = InputMessage::text("Here is the file again")
326    ///     .copy_media(media);
327    /// # }
328    /// ```
329    pub fn copy_media(mut self, media: tl::enums::InputMedia) -> Self {
330        self.media = Some(media); self
331    }
332
333    /// Remove any previously attached media.
334    pub fn clear_media(mut self) -> Self {
335        self.media = None; self
336    }
337
338    fn reply_header(&self) -> Option<tl::enums::InputReplyTo> {
339        self.reply_to.map(|id| {
340            tl::enums::InputReplyTo::Message(
341                tl::types::InputReplyToMessage {
342                    reply_to_msg_id: id,
343                    top_msg_id: None,
344                    reply_to_peer_id: None,
345                    quote_text: None,
346                    quote_entities: None,
347                    quote_offset: None,
348                    monoforum_peer_id: None,
349                    todo_item_id: None,
350                    poll_option: None,
351                }
352            )
353        })
354    }
355}
356
357impl From<&str> for InputMessage {
358    fn from(s: &str) -> Self { Self::text(s) }
359}
360
361impl From<String> for InputMessage {
362    fn from(s: String) -> Self { Self::text(s) }
363}
364
365// ─── TransportKind ────────────────────────────────────────────────────────────
366
367/// Which MTProto transport framing to use for all connections.
368///
369/// | Variant | Init bytes | Notes |
370/// |---------|-----------|-------|
371/// | `Abridged` | `0xef` | Default, smallest overhead |
372/// | `Intermediate` | `0xeeeeeeee` | Better proxy compat |
373/// | `Full` | none | Adds seqno + CRC32 |
374/// | `Obfuscated` | random 64B | Bypasses DPI / MTProxy |
375#[derive(Clone, Debug, Default)]
376pub enum TransportKind {
377    /// MTProto [Abridged] transport — length prefix is 1 or 4 bytes.
378    ///
379    /// [Abridged]: https://core.telegram.org/mtproto/mtproto-transports#abridged
380    #[default]
381    Abridged,
382    /// MTProto [Intermediate] transport — 4-byte LE length prefix.
383    ///
384    /// [Intermediate]: https://core.telegram.org/mtproto/mtproto-transports#intermediate
385    Intermediate,
386    /// MTProto [Full] transport — 4-byte length + seqno + CRC32.
387    ///
388    /// [Full]: https://core.telegram.org/mtproto/mtproto-transports#full
389    Full,
390    /// [Obfuscated2] transport — XOR stream cipher over Abridged framing.
391    /// Required for MTProxy and networks with deep-packet inspection.
392    ///
393    /// `secret` is the 16-byte proxy secret, or `None` for keyless obfuscation.
394    ///
395    /// [Obfuscated2]: https://core.telegram.org/mtproto/mtproto-transports#obfuscated-2
396    Obfuscated { secret: Option<[u8; 16]> },
397}
398
399// ─── Config ───────────────────────────────────────────────────────────────────
400
401/// A token that can be used to gracefully shut down a [`Client`].
402///
403/// Obtained from [`Client::connect`] — call [`ShutdownToken::cancel`] to begin
404/// graceful shutdown. All pending requests will finish and the reader task will
405/// exit cleanly.
406///
407/// # Example
408/// ```rust,no_run
409/// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
410/// use layer_client::{Client, Config, ShutdownToken};
411///
412/// let (client, shutdown) = Client::connect(Config::default()).await?;
413///
414/// // In a signal handler or background task:
415/// // shutdown.cancel();
416/// # Ok(()) }
417/// ```
418pub type ShutdownToken = CancellationToken;
419
420/// Configuration for [`Client::connect`].
421#[derive(Clone)]
422pub struct Config {
423    pub api_id:         i32,
424    pub api_hash:       String,
425    pub dc_addr:        Option<String>,
426    pub retry_policy:   Arc<dyn RetryPolicy>,
427    /// Optional SOCKS5 proxy — every Telegram connection is tunnelled through it.
428    pub socks5:         Option<crate::socks5::Socks5Config>,
429    /// Allow IPv6 DC addresses when populating the DC table (default: false).
430    pub allow_ipv6:     bool,
431    /// Which MTProto transport framing to use (default: Abridged).
432    pub transport:      TransportKind,
433    /// Session persistence backend (default: binary file `"layer.session"`).
434    pub session_backend: Arc<dyn crate::session_backend::SessionBackend>,
435    /// If `true`, replay missed updates via `updates.getDifference` immediately
436    /// after connecting. Mirrors grammers' `UpdatesConfiguration { catch_up: true }`.
437    /// Default: `false`.
438    pub catch_up:       bool,
439}
440
441impl Default for Config {
442    fn default() -> Self {
443        Self {
444            api_id:          0,
445            api_hash:        String::new(),
446            dc_addr:         None,
447            retry_policy:    Arc::new(AutoSleep::default()),
448            socks5:          None,
449            allow_ipv6:      false,
450            transport:       TransportKind::Abridged,
451            session_backend: Arc::new(crate::session_backend::BinaryFileBackend::new("layer.session")),
452            catch_up:        false,
453        }
454    }
455}
456
457// ─── UpdateStream ─────────────────────────────────────────────────────────────
458// G-56: UpdateStream lives here; next_raw() added.
459
460/// Asynchronous stream of [`Update`]s.
461pub struct UpdateStream {
462    rx: mpsc::UnboundedReceiver<update::Update>,
463}
464
465impl UpdateStream {
466    /// Wait for the next update. Returns `None` when the client has disconnected.
467    pub async fn next(&mut self) -> Option<update::Update> {
468        self.rx.recv().await
469    }
470
471    /// Wait for the next **raw** (unrecognised) update frame, skipping all
472    /// typed high-level variants. Useful for handling constructor IDs that
473    /// `layer-client` does not yet wrap — dispatch on `constructor_id` yourself.
474    ///
475    /// Returns `None` when the client has disconnected.
476    pub async fn next_raw(&mut self) -> Option<update::RawUpdate> {
477        loop {
478            match self.rx.recv().await? {
479                update::Update::Raw(r) => return Some(r),
480                _ => continue,
481            }
482        }
483    }
484}
485
486// ─── Dialog ───────────────────────────────────────────────────────────────────
487
488/// A Telegram dialog (chat, user, channel).
489#[derive(Debug, Clone)]
490pub struct Dialog {
491    pub raw:     tl::enums::Dialog,
492    pub message: Option<tl::enums::Message>,
493    pub entity:  Option<tl::enums::User>,
494    pub chat:    Option<tl::enums::Chat>,
495}
496
497impl Dialog {
498    /// The dialog's display title.
499    pub fn title(&self) -> String {
500        if let Some(tl::enums::User::User(u)) = &self.entity {
501            let first = u.first_name.as_deref().unwrap_or("");
502            let last  = u.last_name.as_deref().unwrap_or("");
503            let name  = format!("{first} {last}").trim().to_string();
504            if !name.is_empty() { return name; }
505        }
506        if let Some(chat) = &self.chat {
507            return match chat {
508                tl::enums::Chat::Chat(c)         => c.title.clone(),
509                tl::enums::Chat::Forbidden(c) => c.title.clone(),
510                tl::enums::Chat::Channel(c)      => c.title.clone(),
511                tl::enums::Chat::ChannelForbidden(c) => c.title.clone(),
512                tl::enums::Chat::Empty(_)        => "(empty)".into(),
513            };
514        }
515        "(Unknown)".to_string()
516    }
517
518    /// Peer of this dialog.
519    pub fn peer(&self) -> Option<&tl::enums::Peer> {
520        match &self.raw {
521            tl::enums::Dialog::Dialog(d) => Some(&d.peer),
522            tl::enums::Dialog::Folder(_) => None,
523        }
524    }
525
526    /// Unread message count.
527    pub fn unread_count(&self) -> i32 {
528        match &self.raw {
529            tl::enums::Dialog::Dialog(d) => d.unread_count,
530            _ => 0,
531        }
532    }
533
534    /// ID of the top message.
535    pub fn top_message(&self) -> i32 {
536        match &self.raw {
537            tl::enums::Dialog::Dialog(d) => d.top_message,
538            _ => 0,
539        }
540    }
541}
542
543// ─── ClientInner ─────────────────────────────────────────────────────────────
544
545struct ClientInner {
546    /// Write half of the connection — holds the EncryptedSession (for packing)
547    /// and the send half of the TCP stream. The read half is owned by the
548    /// reader task started in connect().
549    writer:          Mutex<ConnectionWriter>,
550    /// Pending RPC replies, keyed by MTProto msg_id.
551    /// RPC callers insert a oneshot::Sender here before sending; the reader
552    /// task routes incoming rpc_result frames to the matching sender.
553    #[allow(clippy::type_complexity)]
554    pending:         Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>>,
555    /// Channel used to hand a new (OwnedReadHalf, FrameKind, auth_key, session_id)
556    /// to the reader task after a reconnect.
557    reconnect_tx:    mpsc::UnboundedSender<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
558    /// Send `()` here to wake the reader's reconnect backoff loop immediately.
559    /// Used by [`Client::signal_network_restored`].
560    network_hint_tx: mpsc::UnboundedSender<()>,
561    /// Cancelled to signal graceful shutdown to the reader task.
562    #[allow(dead_code)]
563    shutdown_token:  CancellationToken,
564    /// Whether to replay missed updates via getDifference on connect.
565    #[allow(dead_code)]
566    catch_up:        bool,
567    home_dc_id:      Mutex<i32>,
568    dc_options:      Mutex<HashMap<i32, DcEntry>>,
569    pub peer_cache:    Mutex<PeerCache>,
570    pub pts_state:     Mutex<pts::PtsState>,
571    /// G-17: Buffer for updates received during a possible-gap window.
572    pub possible_gap:  Mutex<pts::PossibleGapBuffer>,
573    api_id:          i32,
574    api_hash:        String,
575    retry_policy:    Arc<dyn RetryPolicy>,
576    socks5:          Option<crate::socks5::Socks5Config>,
577    allow_ipv6:      bool,
578    transport:       TransportKind,
579    session_backend: Arc<dyn crate::session_backend::SessionBackend>,
580    dc_pool:         Mutex<dc_pool::DcPool>,
581    update_tx:       mpsc::UnboundedSender<update::Update>,
582}
583
584/// The main Telegram client. Cheap to clone — internally Arc-wrapped.
585#[derive(Clone)]
586pub struct Client {
587    pub(crate) inner: Arc<ClientInner>,
588    _update_rx: Arc<Mutex<mpsc::UnboundedReceiver<update::Update>>>,
589}
590
591impl Client {
592    // ── Connect ────────────────────────────────────────────────────────────
593
594    pub async fn connect(config: Config) -> Result<(Self, ShutdownToken), InvocationError> {
595        let (update_tx, update_rx) = mpsc::unbounded_channel();
596
597        // ── Load or fresh-connect ───────────────────────────────────────
598        let socks5    = config.socks5.clone();
599        let transport = config.transport.clone();
600
601        let (conn, home_dc_id, dc_opts, loaded_session) =
602            match config.session_backend.load()
603                .map_err(InvocationError::Io)?
604            {
605                Some(s) => {
606                    if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
607                        if let Some(key) = dc.auth_key {
608                            log::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
609                            match Connection::connect_with_key(
610                                &dc.addr, key, dc.first_salt, dc.time_offset,
611                                socks5.as_ref(), &transport,
612                            ).await {
613                                Ok(c) => {
614                                    let mut opts = session::default_dc_addresses()
615                                        .into_iter()
616                                        .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
617                                        .collect::<HashMap<_, _>>();
618                                    for d in &s.dcs { opts.insert(d.dc_id, d.clone()); }
619                                    (c, s.home_dc_id, opts, Some(s))
620                                }
621                                Err(e) => {
622                                    log::warn!("[layer] Session connect failed ({e}), fresh connect …");
623                                    let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
624                                    (c, dc, opts, None)
625                                }
626                            }
627                        } else {
628                            let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
629                            (c, dc, opts, None)
630                        }
631                    } else {
632                        let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
633                        (c, dc, opts, None)
634                    }
635                }
636                None => {
637                    let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
638                    (c, dc, opts, None)
639                }
640            };
641
642        // ── Build DC pool ───────────────────────────────────────────────
643        let pool = dc_pool::DcPool::new(home_dc_id, &dc_opts.values().cloned().collect::<Vec<_>>());
644
645        // Split the TCP stream immediately.
646        // The writer (write half + EncryptedSession) stays in ClientInner.
647        // The read half goes to the reader task which we spawn right now so
648        // that RPC calls during init_connection work correctly.
649        let (writer, read_half, frame_kind) = conn.into_writer();
650        let auth_key   = writer.enc.auth_key_bytes();
651        let session_id = writer.enc.session_id();
652
653        #[allow(clippy::type_complexity)]
654        let pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>> =
655            Arc::new(Mutex::new(HashMap::new()));
656
657        // Channel the reconnect logic uses to hand a new read half to the reader task.
658        let (reconnect_tx, reconnect_rx) =
659            mpsc::unbounded_channel::<(OwnedReadHalf, FrameKind, [u8; 256], i64)>();
660
661        // Channel for external "network restored" hints — lets Android/iOS callbacks
662        // skip the reconnect backoff and attempt immediately.
663        let (network_hint_tx, network_hint_rx) = mpsc::unbounded_channel::<()>();
664
665
666        // Graceful shutdown token — cancel this to stop the reader task cleanly.
667        let shutdown_token = CancellationToken::new();
668        let catch_up = config.catch_up;
669
670        let inner = Arc::new(ClientInner {
671            writer:          Mutex::new(writer),
672            pending:         pending.clone(),
673            reconnect_tx,
674            network_hint_tx,
675            shutdown_token:  shutdown_token.clone(),
676            catch_up,
677            home_dc_id:      Mutex::new(home_dc_id),
678            dc_options:      Mutex::new(dc_opts),
679            peer_cache:      Mutex::new(PeerCache::default()),
680            pts_state:       Mutex::new(pts::PtsState::default()),
681            possible_gap:    Mutex::new(pts::PossibleGapBuffer::new()),
682            api_id:          config.api_id,
683            api_hash:        config.api_hash,
684            retry_policy:    config.retry_policy,
685            socks5:          config.socks5,
686            allow_ipv6:      config.allow_ipv6,
687            transport:       config.transport,
688            session_backend: config.session_backend,
689            dc_pool:         Mutex::new(pool),
690            update_tx,
691        });
692
693        let client = Self {
694            inner,
695            _update_rx: Arc::new(Mutex::new(update_rx)),
696        };
697
698        // Spawn the reader task immediately so that RPC calls during
699        // init_connection can receive their responses.
700        {
701            let client_r = client.clone();
702            let shutdown_r = shutdown_token.clone();
703            tokio::spawn(async move {
704                client_r.run_reader_task(
705                    read_half, frame_kind, auth_key, session_id,
706                    reconnect_rx, network_hint_rx, shutdown_r,
707                ).await;
708            });
709        }
710
711        // If init_connection fails (e.g. stale auth key rejected by Telegram),
712        // do a fresh DH handshake and retry once.
713        if let Err(e) = client.init_connection().await {
714            log::warn!("[layer] init_connection failed ({e}), retrying with fresh connect …");
715
716            let socks5_r    = client.inner.socks5.clone();
717            let transport_r = client.inner.transport.clone();
718            let (new_conn, new_dc_id, new_opts) =
719                Self::fresh_connect(socks5_r.as_ref(), &transport_r).await?;
720
721            {
722                let mut dc_guard = client.inner.home_dc_id.lock().await;
723                *dc_guard = new_dc_id;
724            }
725            {
726                let mut opts_guard = client.inner.dc_options.lock().await;
727                *opts_guard = new_opts;
728            }
729
730            // Replace writer and hand new read half to reader task
731            let (new_writer, new_read, new_fk) = new_conn.into_writer();
732            let new_ak = new_writer.enc.auth_key_bytes();
733            let new_sid = new_writer.enc.session_id();
734            *client.inner.writer.lock().await = new_writer;
735            let _ = client.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
736
737            client.init_connection().await?;
738        }
739
740        // ── Restore peer access-hash cache from session ─────────────────
741        if let Some(ref s) = loaded_session
742            && !s.peers.is_empty() {
743            let mut cache = client.inner.peer_cache.lock().await;
744            for p in &s.peers {
745                if p.is_channel {
746                    cache.channels.entry(p.id).or_insert(p.access_hash);
747                } else {
748                    cache.users.entry(p.id).or_insert(p.access_hash);
749                }
750            }
751            log::debug!("[layer] Peer cache restored: {} users, {} channels",
752                cache.users.len(), cache.channels.len());
753        }
754
755        // ── Restore update state / catch-up ─────────────────────────────
756        //
757        // Two modes:
758        //   catch_up=false → always call sync_pts_state() so we start from
759        //                    the current server state (ignore saved pts).
760        //   catch_up=true  → if we have a saved pts > 0, restore it and let
761        //                    get_difference() fetch what we missed.  Only fall
762        //                    back to sync_pts_state() when there is no saved
763        //                    state (first boot, or fresh session).
764        let has_saved_state = loaded_session
765            .as_ref()
766            .is_some_and(|s| s.updates_state.is_initialised());
767
768        if catch_up && has_saved_state {
769            let snap = &loaded_session.as_ref().unwrap().updates_state;
770            let mut state = client.inner.pts_state.lock().await;
771            state.pts  = snap.pts;
772            state.qts  = snap.qts;
773            state.date = snap.date;
774            state.seq  = snap.seq;
775            for &(cid, cpts) in &snap.channels {
776                state.channel_pts.insert(cid, cpts);
777            }
778            log::info!("[layer] Update state restored: pts={}, qts={}, seq={}, {} channels",
779                state.pts, state.qts, state.seq, state.channel_pts.len());
780            drop(state);
781
782            // Now spawn the catch-up diff — pts is the *old* value, so
783            // getDifference will return exactly what we missed.
784            let c   = client.clone();
785            let utx = client.inner.update_tx.clone();
786            tokio::spawn(async move {
787                if let Ok(missed) = c.get_difference().await {
788                    log::info!("[layer] catch_up: {} missed updates replayed", missed.len());
789                    for u in missed { let _ = utx.send(u); }
790                }
791            });
792        } else {
793            // No saved state or catch_up disabled — sync from server.
794            let _ = client.sync_pts_state().await;
795        }
796
797        Ok((client, shutdown_token))
798    }
799
800    async fn fresh_connect(
801        socks5:    Option<&crate::socks5::Socks5Config>,
802        transport: &TransportKind,
803    ) -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
804        log::info!("[layer] Fresh connect to DC2 …");
805        let conn = Connection::connect_raw("149.154.167.51:443", socks5, transport).await?;
806        let opts = session::default_dc_addresses()
807            .into_iter()
808            .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
809            .collect();
810        Ok((conn, 2, opts))
811    }
812
813    // ── Session ────────────────────────────────────────────────────────────
814
815    pub async fn save_session(&self) -> Result<(), InvocationError> {
816        use session::{CachedPeer, UpdatesStateSnap};
817
818        let writer_guard = self.inner.writer.lock().await;
819        let home_dc_id   = *self.inner.home_dc_id.lock().await;
820        let dc_options   = self.inner.dc_options.lock().await;
821
822        let mut dcs: Vec<DcEntry> = dc_options.values().map(|e| DcEntry {
823            dc_id:       e.dc_id,
824            addr:        e.addr.clone(),
825            auth_key:    if e.dc_id == home_dc_id { Some(writer_guard.auth_key_bytes()) } else { e.auth_key },
826            first_salt:  if e.dc_id == home_dc_id { writer_guard.first_salt() } else { e.first_salt },
827            time_offset: if e.dc_id == home_dc_id { writer_guard.time_offset() } else { e.time_offset },
828        }).collect();
829        self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
830
831        // Snapshot current pts state
832        let pts_snap = {
833            let s = self.inner.pts_state.lock().await;
834            UpdatesStateSnap {
835                pts:      s.pts,
836                qts:      s.qts,
837                date:     s.date,
838                seq:      s.seq,
839                channels: s.channel_pts.iter().map(|(&k, &v)| (k, v)).collect(),
840            }
841        };
842
843        // Snapshot peer access-hash cache
844        let peers: Vec<CachedPeer> = {
845            let cache = self.inner.peer_cache.lock().await;
846            let mut v = Vec::with_capacity(cache.users.len() + cache.channels.len());
847            for (&id, &hash) in &cache.users    { v.push(CachedPeer { id, access_hash: hash, is_channel: false }); }
848            for (&id, &hash) in &cache.channels { v.push(CachedPeer { id, access_hash: hash, is_channel: true  }); }
849            v
850        };
851
852        self.inner.session_backend
853            .save(&PersistedSession { home_dc_id, dcs, updates_state: pts_snap, peers })
854            .map_err(InvocationError::Io)?;
855        log::info!("[layer] Session saved ✓");
856        Ok(())
857    }
858
859    // ── Auth ───────────────────────────────────────────────────────────────
860
861    /// Returns `true` if the client is already authorized.
862    pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
863        match self.invoke(&tl::functions::updates::GetState {}).await {
864            Ok(_)  => Ok(true),
865            Err(e) if e.is("AUTH_KEY_UNREGISTERED")
866                   || matches!(&e, InvocationError::Rpc(r) if r.code == 401) => Ok(false),
867            Err(e) => Err(e),
868        }
869    }
870
871    /// Sign in as a bot.
872    pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
873        let req = tl::functions::auth::ImportBotAuthorization {
874            flags: 0, api_id: self.inner.api_id,
875            api_hash: self.inner.api_hash.clone(),
876            bot_auth_token: token.to_string(),
877        };
878
879        let result = match self.invoke(&req).await {
880            Ok(x) => x,
881            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
882                let dc_id = r.value.unwrap_or(2) as i32;
883                self.migrate_to(dc_id).await?;
884                self.invoke(&req).await?
885            }
886            Err(e) => return Err(e),
887        };
888
889        let name = match result {
890            tl::enums::auth::Authorization::Authorization(a) => {
891                self.cache_user(&a.user).await;
892                Self::extract_user_name(&a.user)
893            }
894            tl::enums::auth::Authorization::SignUpRequired(_) => {
895                panic!("unexpected SignUpRequired during bot sign-in")
896            }
897        };
898        log::info!("[layer] Bot signed in ✓  ({name})");
899        Ok(name)
900    }
901
902    /// Request a login code for a user account.
903    pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
904        use tl::enums::auth::SentCode;
905
906        let req = self.make_send_code_req(phone);
907        let body = match self.rpc_call_raw(&req).await {
908            Ok(b) => b,
909            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
910                let dc_id = r.value.unwrap_or(2) as i32;
911                self.migrate_to(dc_id).await?;
912                self.rpc_call_raw(&req).await?
913            }
914            Err(e) => return Err(e),
915        };
916
917        let mut cur = Cursor::from_slice(&body);
918        let hash = match tl::enums::auth::SentCode::deserialize(&mut cur)? {
919            SentCode::SentCode(s) => s.phone_code_hash,
920            SentCode::Success(_)  => return Err(InvocationError::Deserialize("unexpected Success".into())),
921            SentCode::PaymentRequired(_) => return Err(InvocationError::Deserialize("payment required to send code".into())),
922        };
923        log::info!("[layer] Login code sent");
924        Ok(LoginToken { phone: phone.to_string(), phone_code_hash: hash })
925    }
926
927    /// Complete sign-in with the code sent to the phone.
928    pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
929        let req = tl::functions::auth::SignIn {
930            phone_number:    token.phone.clone(),
931            phone_code_hash: token.phone_code_hash.clone(),
932            phone_code:      Some(code.trim().to_string()),
933            email_verification: None,
934        };
935
936        let body = match self.rpc_call_raw(&req).await {
937            Ok(b) => b,
938            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
939                let dc_id = r.value.unwrap_or(2) as i32;
940                self.migrate_to(dc_id).await.map_err(SignInError::Other)?;
941                self.rpc_call_raw(&req).await.map_err(SignInError::Other)?
942            }
943            Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
944                let t = self.get_password_info().await.map_err(SignInError::Other)?;
945                return Err(SignInError::PasswordRequired(Box::new(t)));
946            }
947            Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
948            Err(e) => return Err(SignInError::Other(e)),
949        };
950
951        let mut cur = Cursor::from_slice(&body);
952        match tl::enums::auth::Authorization::deserialize(&mut cur)
953            .map_err(|e| SignInError::Other(e.into()))?
954        {
955            tl::enums::auth::Authorization::Authorization(a) => {
956                self.cache_user(&a.user).await;
957                let name = Self::extract_user_name(&a.user);
958                log::info!("[layer] Signed in ✓  Welcome, {name}!");
959                Ok(name)
960            }
961            tl::enums::auth::Authorization::SignUpRequired(_) => Err(SignInError::SignUpRequired),
962        }
963    }
964
965    /// Complete 2FA login.
966    pub async fn check_password(
967        &self,
968        token:    PasswordToken,
969        password: impl AsRef<[u8]>,
970    ) -> Result<String, InvocationError> {
971        let pw   = token.password;
972        let algo = pw.current_algo.ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
973        let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
974        let g_b  = pw.srp_b.ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
975        let a    = pw.secure_random;
976        let srp_id = pw.srp_id.ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
977
978        let (m1, g_a) = two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
979        let req = tl::functions::auth::CheckPassword {
980            password: tl::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
981                tl::types::InputCheckPasswordSrp {
982                    srp_id, a: g_a.to_vec(), m1: m1.to_vec(),
983                },
984            ),
985        };
986
987        let body = self.rpc_call_raw(&req).await?;
988        let mut cur = Cursor::from_slice(&body);
989        match tl::enums::auth::Authorization::deserialize(&mut cur)? {
990            tl::enums::auth::Authorization::Authorization(a) => {
991                self.cache_user(&a.user).await;
992                let name = Self::extract_user_name(&a.user);
993                log::info!("[layer] 2FA ✓  Welcome, {name}!");
994                Ok(name)
995            }
996            tl::enums::auth::Authorization::SignUpRequired(_) =>
997                Err(InvocationError::Deserialize("unexpected SignUpRequired after 2FA".into())),
998        }
999    }
1000
1001    /// Sign out and invalidate the current session.
1002    pub async fn sign_out(&self) -> Result<bool, InvocationError> {
1003        let req = tl::functions::auth::LogOut {};
1004        match self.rpc_call_raw(&req).await {
1005            Ok(_) => { log::info!("[layer] Signed out ✓"); Ok(true) }
1006            Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
1007            Err(e) => Err(e),
1008        }
1009    }
1010
1011    // ── Get self ───────────────────────────────────────────────────────────
1012
1013    /// Fetch information about the logged-in user.
1014    pub async fn get_me(&self) -> Result<tl::types::User, InvocationError> {
1015        let req = tl::functions::users::GetUsers {
1016            id: vec![tl::enums::InputUser::UserSelf],
1017        };
1018        let body    = self.rpc_call_raw(&req).await?;
1019        let mut cur = Cursor::from_slice(&body);
1020        let users   = Vec::<tl::enums::User>::deserialize(&mut cur)?;
1021        self.cache_users_slice(&users).await;
1022        users.into_iter().find_map(|u| match u {
1023            tl::enums::User::User(u) => Some(u),
1024            _ => None,
1025        }).ok_or_else(|| InvocationError::Deserialize("getUsers returned no user".into()))
1026    }
1027
1028    // ── Updates ────────────────────────────────────────────────────────────
1029
1030    /// Return an [`UpdateStream`] that yields incoming [`Update`]s.
1031    ///
1032    /// The reader task (started inside `connect()`) sends all updates to
1033    /// `inner.update_tx`. This method proxies those updates into a fresh
1034    /// caller-owned channel — typically called once per bot/app loop.
1035    pub fn stream_updates(&self) -> UpdateStream {
1036        let (caller_tx, rx) = mpsc::unbounded_channel::<update::Update>();
1037        // The internal channel is: reader_task → update_tx → _update_rx
1038        // We forward _update_rx → caller_tx so the caller gets an owned stream.
1039        let internal_rx = self._update_rx.clone();
1040        tokio::spawn(async move {
1041            // Lock once and hold — this is the sole consumer of the internal channel.
1042            let mut guard = internal_rx.lock().await;
1043            while let Some(upd) = guard.recv().await {
1044                if caller_tx.send(upd).is_err() { break; }
1045            }
1046        });
1047        UpdateStream { rx }
1048    }
1049
1050    // ── Network hint ───────────────────────────────────────────────────────
1051
1052    /// Signal that network connectivity has been restored.
1053    ///
1054    /// Call this from platform network-change callbacks — Android's
1055    /// `ConnectivityManager`, iOS `NWPathMonitor`, or any other OS hook —
1056    /// to make the client attempt an immediate reconnect instead of waiting
1057    /// for the exponential backoff timer to expire.
1058    ///
1059    /// Safe to call at any time: if the connection is healthy the hint is
1060    /// silently ignored by the reader task; if it is in a backoff loop it
1061    /// wakes up and tries again right away.
1062    pub fn signal_network_restored(&self) {
1063        let _ = self.inner.network_hint_tx.send(());
1064    }
1065
1066    // ── Reader task ────────────────────────────────────────────────────────
1067    // Decrypts frames without holding any lock, then routes:
1068    //   rpc_result  → pending map (oneshot to waiting RPC caller)
1069    //   update      → update_tx  (delivered to stream_updates consumers)
1070    //   bad_server_salt → updates writer salt
1071    //
1072    // On error: drains pending with Io errors (so AutoSleep retries callers),
1073    // then loops with exponential backoff until reconnect succeeds.
1074    // network_hint_rx lets external callers (Android/iOS) skip the backoff.
1075    //
1076    // DC migration / reconnect: the new read half arrives via new_conn_rx.
1077    // The select! between recv_frame_owned and new_conn_rx.recv() ensures we
1078    // switch to the new connection immediately, without waiting for the next
1079    // frame on the old (now stale) connection.
1080
1081    // ── Reader task supervisor ────────────────────────────────────────────────
1082    //
1083    // run_reader_task is the outer supervisor. It wraps reader_loop in a
1084    // restart loop so that if reader_loop ever exits for any reason other than
1085    // a clean shutdown request, it is automatically reconnected and restarted.
1086    //
1087    // This mirrors what Telegram Desktop does: the network thread is considered
1088    // infrastructure and is never allowed to die permanently.
1089    //
1090    // Restart sequence on unexpected exit:
1091    //   1. Drain all pending RPCs with ConnectionReset so callers unblock.
1092    //   2. Exponential-backoff reconnect loop (500 ms → 30 s cap) until TCP
1093    //      succeeds, respecting the shutdown token at every sleep point.
1094    //   3. Spawn init_connection in a background task (same deadlock-safe
1095    //      pattern as do_reconnect_loop) and pass the oneshot receiver as the
1096    //      initial_init_rx to the restarted reader_loop.
1097    //   4. reader_loop picks up init_rx immediately on its first iteration and
1098    //      handles success/failure exactly like a mid-session reconnect.
1099    #[allow(clippy::too_many_arguments)]
1100    async fn run_reader_task(
1101        &self,
1102        read_half:   OwnedReadHalf,
1103        frame_kind:  FrameKind,
1104        auth_key:    [u8; 256],
1105        session_id:  i64,
1106        mut new_conn_rx:      mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1107        mut network_hint_rx:  mpsc::UnboundedReceiver<()>,
1108        shutdown_token:       CancellationToken,
1109    ) {
1110        let mut rh  = read_half;
1111        let mut fk  = frame_kind;
1112        let mut ak  = auth_key;
1113        let mut sid = session_id;
1114        // On first start no init is needed (connect() already called it).
1115        // On restarts we pass the spawned init task so reader_loop handles it.
1116        let mut restart_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = None;
1117        let mut restart_count: u32 = 0;
1118
1119        loop {
1120            tokio::select! {
1121                // ── Clean shutdown ────────────────────────────────────────────
1122                _ = shutdown_token.cancelled() => {
1123                    log::info!("[layer] Reader task: shutdown requested, exiting cleanly.");
1124                    let mut pending = self.inner.pending.lock().await;
1125                    for (_, tx) in pending.drain() {
1126                        let _ = tx.send(Err(InvocationError::Dropped));
1127                    }
1128                    return;
1129                }
1130
1131                // ── reader_loop ───────────────────────────────────────────────
1132                _ = self.reader_loop(
1133                        rh, fk, ak, sid,
1134                        restart_init_rx.take(),
1135                        &mut new_conn_rx, &mut network_hint_rx,
1136                    ) => {}
1137            }
1138
1139            // If we reach here, reader_loop returned without a shutdown signal.
1140            // This should never happen in normal operation — treat it as a fault.
1141            if shutdown_token.is_cancelled() {
1142                log::info!("[layer] Reader task: exiting after loop (shutdown).");
1143                return;
1144            }
1145
1146            restart_count += 1;
1147            log::error!(
1148                "[layer] Reader loop exited unexpectedly (restart #{restart_count}) —                  supervisor reconnecting …"
1149            );
1150
1151            // Step 1: drain all pending RPCs so callers don't hang.
1152            {
1153                let mut pending = self.inner.pending.lock().await;
1154                for (_, tx) in pending.drain() {
1155                    let _ = tx.send(Err(InvocationError::Io(std::io::Error::new(
1156                        std::io::ErrorKind::ConnectionReset,
1157                        "reader task restarted",
1158                    ))));
1159                }
1160            }
1161
1162            // Step 2: reconnect with exponential backoff, honouring shutdown.
1163            let mut delay_ms = RECONNECT_BASE_MS;
1164            let new_conn = loop {
1165                log::info!("[layer] Supervisor: reconnecting in {delay_ms} ms …");
1166                tokio::select! {
1167                    _ = shutdown_token.cancelled() => {
1168                        log::info!("[layer] Supervisor: shutdown during reconnect, exiting.");
1169                        return;
1170                    }
1171                    _ = sleep(Duration::from_millis(delay_ms)) => {}
1172                }
1173
1174                // do_reconnect ignores both params (_old_auth_key, _old_frame_kind) —
1175                // it re-reads everything from ClientInner. rh/fk/ak/sid were moved
1176                // into reader_loop, so we pass dummies here; fresh values come back
1177                // from the Ok result and replace them below.
1178                let dummy_ak = [0u8; 256];
1179                let dummy_fk = FrameKind::Abridged;
1180                match self.do_reconnect(&dummy_ak, &dummy_fk).await {
1181                    Ok(conn) => break conn,
1182                    Err(e)   => {
1183                        log::warn!("[layer] Supervisor: reconnect failed ({e})");
1184                        let next = (delay_ms * 2).min(RECONNECT_MAX_SECS * 1_000);
1185                        delay_ms = jitter_delay(next).as_millis() as u64;
1186                    }
1187                }
1188            };
1189
1190            let (new_rh, new_fk, new_ak, new_sid) = new_conn;
1191            rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
1192
1193            // Step 3: spawn init_connection (cannot await inline — reader must
1194            // be running to route the RPC response, or we deadlock).
1195            let (init_tx, init_rx) = oneshot::channel();
1196            let c   = self.clone();
1197            let utx = self.inner.update_tx.clone();
1198            tokio::spawn(async move {
1199                // Respect FLOOD_WAIT (same as do_reconnect_loop).
1200                let result = loop {
1201                    match c.init_connection().await {
1202                        Ok(()) => break Ok(()),
1203                        Err(InvocationError::Rpc(ref r))
1204                            if r.flood_wait_seconds().is_some() =>
1205                        {
1206                            let secs = r.flood_wait_seconds().unwrap();
1207                            log::warn!(
1208                                "[layer] Supervisor init_connection FLOOD_WAIT_{secs} — waiting"
1209                            );
1210                            sleep(Duration::from_secs(secs + 1)).await;
1211                        }
1212                        Err(e) => break Err(e),
1213                    }
1214                };
1215                if result.is_ok()
1216                    && let Ok(missed) = c.get_difference().await {
1217                    for u in missed { let _ = utx.send(u); }
1218                }
1219                let _ = init_tx.send(result);
1220            });
1221            restart_init_rx = Some(init_rx);
1222
1223            log::info!("[layer] Supervisor: restarting reader loop (restart #{restart_count}) …");
1224            // Loop back → reader_loop restarts with the fresh connection.
1225        }
1226    }
1227
1228    #[allow(clippy::too_many_arguments)]
1229    async fn reader_loop(
1230        &self,
1231        mut rh:          OwnedReadHalf,
1232        mut fk:          FrameKind,
1233        mut ak:          [u8; 256],
1234        mut sid:         i64,
1235        // When Some, the supervisor has already spawned init_connection on our
1236        // behalf (supervisor restart path). On first start this is None.
1237        initial_init_rx:  Option<oneshot::Receiver<Result<(), InvocationError>>>,
1238        new_conn_rx:      &mut mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1239        network_hint_rx:  &mut mpsc::UnboundedReceiver<()>,
1240    ) {
1241        // Tracks an in-flight init_connection task spawned after every reconnect.
1242        // The reader loop must keep routing frames while we wait so the RPC
1243        // response can reach its oneshot sender (otherwise → 30 s self-deadlock).
1244        // If init fails we re-enter the reconnect loop immediately.
1245        let mut init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = initial_init_rx;
1246        // How many consecutive init_connection failures have occurred on the
1247        // *current* auth key.  We retry with the same key up to 2 times before
1248        // assuming the key is stale and clearing it for a fresh DH handshake.
1249        // This prevents a transient 30 s timeout from nuking a valid session.
1250        let mut init_fail_count: u32 = 0;
1251
1252        loop {
1253            tokio::select! {
1254                // ── Normal frame (or application-level keepalive timeout) ─────
1255                outcome = recv_frame_with_keepalive(&mut rh, &fk, self, &ak) => {
1256                    match outcome {
1257                        FrameOutcome::Frame(mut raw) => {
1258                            let msg = match EncryptedSession::decrypt_frame(&ak, sid, &mut raw) {
1259                                Ok(m)  => m,
1260                                Err(e) => { log::warn!("[layer] Decrypt error: {e:?}"); continue; }
1261                            };
1262                            if msg.salt != 0 {
1263                                self.inner.writer.lock().await.enc.salt = msg.salt;
1264                            }
1265                            self.route_frame(msg.body, msg.msg_id).await;
1266
1267                            // G-04: flush any accumulated acks as a standalone MsgsAck.
1268                            // (When an RPC is in flight they're bundled into a container
1269                            // instead — this covers the case where only updates arrived.)
1270                            let acks_to_send = {
1271                                let mut w = self.inner.writer.lock().await;
1272                                let v: Vec<i64> = w.pending_ack.drain(..).collect();
1273                                v
1274                            };
1275                            if !acks_to_send.is_empty() {
1276                                let mut w = self.inner.writer.lock().await;
1277                                let fk = w.frame_kind.clone();
1278                                let ack_body = build_msgs_ack_body(&acks_to_send);
1279                                let (wire, _) = w.enc.pack_body_with_msg_id(&ack_body, false);
1280                                send_frame_write(&mut w.write_half, &wire, &fk).await.ok();
1281                                log::trace!("[layer] G-04 flushed {} acks", acks_to_send.len());
1282                            }
1283                        }
1284
1285                        FrameOutcome::Error(e) => {
1286                            log::warn!("[layer] Reader: connection error: {e}");
1287                            drop(init_rx.take()); // discard any in-flight init
1288
1289                            // Fail all in-flight RPCs immediately so AutoSleep
1290                            // retries them as soon as we reconnect.
1291                            {
1292                                let mut pending = self.inner.pending.lock().await;
1293                                let msg = e.to_string();
1294                                for (_, tx) in pending.drain() {
1295                                    let _ = tx.send(Err(InvocationError::Io(
1296                                        std::io::Error::new(
1297                                            std::io::ErrorKind::ConnectionReset, msg.clone()))));
1298                                }
1299                            }
1300
1301                            match self.do_reconnect_loop(
1302                                RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
1303                                network_hint_rx,
1304                            ).await {
1305                                Some(rx) => { init_rx = Some(rx); }
1306                                None     => return, // shutdown requested
1307                            }
1308                        }
1309
1310                        FrameOutcome::Keepalive => {} // ping sent successfully; loop
1311                    }
1312                }
1313
1314                // ── DC migration / deliberate reconnect ──────────────────────
1315                maybe = new_conn_rx.recv() => {
1316                    if let Some((new_rh, new_fk, new_ak, new_sid)) = maybe {
1317                        rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
1318                        log::info!("[layer] Reader: switched to new connection.");
1319                    } else {
1320                        break; // reconnect_tx dropped → client is shutting down
1321                    }
1322                }
1323
1324
1325                // ── init_connection result (polled only when Some) ────────────
1326                init_result = async { init_rx.as_mut().unwrap().await }, if init_rx.is_some() => {
1327                    init_rx = None;
1328                    match init_result {
1329                        Ok(Ok(())) => {
1330                            init_fail_count = 0;
1331                            // Fully initialised. Persist updated DC/session state.
1332                            // Retry up to 3 times — a transient I/O error (e.g. the
1333                            // filesystem briefly unavailable on Android) should not
1334                            // cause the auth key to diverge from disk permanently.
1335                            log::info!("[layer] Reconnected to Telegram ✓ — session live, replaying missed updates …");
1336                            for attempt in 1u8..=3 {
1337                                match self.save_session().await {
1338                                    Ok(()) => break,
1339                                    Err(e) if attempt < 3 => {
1340                                        log::warn!(
1341                                            "[layer] save_session failed (attempt {attempt}/3): {e}"
1342                                        );
1343                                        sleep(Duration::from_millis(500)).await;
1344                                    }
1345                                    Err(e) => {
1346                                        log::error!(
1347                                            "[layer] save_session permanently failed after 3 attempts: {e}"
1348                                        );
1349                                    }
1350                                }
1351                            }
1352                        }
1353
1354                        Ok(Err(e)) => {
1355                            // TCP connected but init RPC failed.
1356                            //
1357                            // Grammers' approach: only treat the key as stale when
1358                            // Telegram sends a definitive "bad auth key" signal —
1359                            // a -404 transport error code.  A 30 s timeout is almost
1360                            // always a transient network issue and must NOT clear the
1361                            // session (doing so creates a brand-new session in Telegram's
1362                            // active devices list and orphans the old one).
1363                            let key_is_stale = match &e {
1364                                // Telegram's abridged-transport error code for invalid key
1365                                InvocationError::Rpc(r) if r.code == -404 => true,
1366                                // Early EOF immediately after connecting = server rejected key
1367                                InvocationError::Io(io) if io.kind() == std::io::ErrorKind::UnexpectedEof
1368                                    || io.kind() == std::io::ErrorKind::ConnectionReset => true,
1369                                // 30 s timeout → transient; keep the key and retry
1370                                _ => false,
1371                            };
1372
1373                            if key_is_stale {
1374                                log::warn!(
1375                                    "[layer] init_connection failed with definitive bad-key signal ({e}) \
1376                                     — clearing auth key for fresh DH …"
1377                                );
1378                                init_fail_count = 0;
1379                                let home_dc_id = *self.inner.home_dc_id.lock().await;
1380                                let mut opts = self.inner.dc_options.lock().await;
1381                                if let Some(entry) = opts.get_mut(&home_dc_id) {
1382                                    entry.auth_key = None;
1383                                }
1384                            } else {
1385                                init_fail_count += 1;
1386                                log::warn!(
1387                                    "[layer] init_connection failed transiently (attempt {init_fail_count}, {e}) \
1388                                     — retrying with same key …"
1389                                );
1390                            }
1391                            {
1392                                let mut pending = self.inner.pending.lock().await;
1393                                let msg = e.to_string();
1394                                for (_, tx) in pending.drain() {
1395                                    let _ = tx.send(Err(InvocationError::Io(
1396                                        std::io::Error::new(
1397                                            std::io::ErrorKind::ConnectionReset, msg.clone()))));
1398                                }
1399                            }
1400                            match self.do_reconnect_loop(
1401                                0, &mut rh, &mut fk, &mut ak, &mut sid, network_hint_rx,
1402                            ).await {
1403                                Some(rx) => { init_rx = Some(rx); }
1404                                None     => return,
1405                            }
1406                        }
1407
1408                        Err(_) => {
1409                            // init task was dropped (shouldn't normally happen).
1410                            log::warn!("[layer] init_connection task dropped unexpectedly, reconnecting …");
1411                            match self.do_reconnect_loop(
1412                                RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
1413                                network_hint_rx,
1414                            ).await {
1415                                Some(rx) => { init_rx = Some(rx); }
1416                                None     => return,
1417                            }
1418                        }
1419                    }
1420                }
1421            }
1422        }
1423    }
1424
1425    /// Route a decrypted MTProto frame body to either a pending RPC caller or update_tx.
1426    async fn route_frame(&self, body: Vec<u8>, msg_id: i64) {
1427        if body.len() < 4 { return; }
1428        let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
1429
1430        match cid {
1431            ID_RPC_RESULT => {
1432                if body.len() < 12 { return; }
1433                let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1434                let inner = body[12..].to_vec();
1435                // G-04: ack the rpc_result container message
1436                self.inner.writer.lock().await.pending_ack.push(msg_id);
1437                let result = unwrap_envelope(inner);
1438                if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
1439                    // G-02: request resolved — remove from sent_bodies
1440                    self.inner.writer.lock().await.sent_bodies.remove(&req_msg_id);
1441                    let to_send = match result {
1442                        Ok(EnvelopeResult::Payload(p)) => Ok(p),
1443                        Ok(EnvelopeResult::Updates(us)) => {
1444                            for u in us { let _ = self.inner.update_tx.send(u); }
1445                            Ok(vec![])
1446                        }
1447                        Ok(EnvelopeResult::None) => Ok(vec![]),
1448                        Err(e) => Err(e),
1449                    };
1450                    let _ = tx.send(to_send);
1451                }
1452            }
1453            ID_RPC_ERROR => {
1454                log::warn!("[layer] Unexpected top-level rpc_error (no pending target)");
1455            }
1456            ID_MSG_CONTAINER => {
1457                if body.len() < 8 { return; }
1458                let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
1459                let mut pos = 8usize;
1460                for _ in 0..count {
1461                    if pos + 16 > body.len() { break; }
1462                    // Extract inner msg_id for correct ack tracking
1463                    let inner_msg_id = i64::from_le_bytes(body[pos..pos + 8].try_into().unwrap());
1464                    let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
1465                    pos += 16;
1466                    if pos + inner_len > body.len() { break; }
1467                    let inner = body[pos..pos + inner_len].to_vec();
1468                    pos += inner_len;
1469                    Box::pin(self.route_frame(inner, inner_msg_id)).await;
1470                }
1471            }
1472            ID_GZIP_PACKED => {
1473                let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
1474                if let Ok(inflated) = gz_inflate(&bytes) {
1475                    // pass same outer msg_id — gzip has no msg_id of its own
1476                    Box::pin(self.route_frame(inflated, msg_id)).await;
1477                }
1478            }
1479            ID_BAD_SERVER_SALT => {
1480                // bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int new_server_salt:long
1481                // body[0..4]   = constructor
1482                // body[4..12]  = bad_msg_id
1483                // body[12..16] = bad_msg_seqno
1484                // body[16..24] = new_server_salt  ← was wrongly [8..16]
1485                if body.len() >= 24 {
1486                    let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1487                    let new_salt   = i64::from_le_bytes(body[16..24].try_into().unwrap());
1488
1489                    // Update session salt so re-sent request uses the correct salt.
1490                    self.inner.writer.lock().await.enc.salt = new_salt;
1491                    log::debug!("[layer] bad_server_salt: bad_msg_id={bad_msg_id} salt={new_salt:#x}");
1492
1493                    // Re-transmit the original request under the new salt.
1494                    {
1495                        let mut w = self.inner.writer.lock().await;
1496                        if let Some(orig_body) = w.sent_bodies.remove(&bad_msg_id) {
1497                            let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
1498                            let fk = w.frame_kind.clone();
1499                            w.sent_bodies.insert(new_msg_id, orig_body);
1500                            // Drop writer before touching pending (lock-order safety).
1501                            drop(w);
1502                            let mut pending = self.inner.pending.lock().await;
1503                            if let Some(tx) = pending.remove(&bad_msg_id) {
1504                                pending.insert(new_msg_id, tx);
1505                                drop(pending);
1506                                let mut w = self.inner.writer.lock().await;
1507                                if let Err(e) = send_frame_write(&mut w.write_half, &wire, &fk).await {
1508                                    log::warn!("[layer] bad_server_salt re-send failed: {e}");
1509                                } else {
1510                                    log::debug!("[layer] bad_server_salt re-sent {bad_msg_id}→{new_msg_id}");
1511                                }
1512                            }
1513                        }
1514                    }
1515
1516                    // G-08: proactively refresh salt pool in background.
1517                    let inner = Arc::clone(&self.inner);
1518                    tokio::spawn(async move {
1519                        log::debug!("[layer] G-08 proactive GetFutureSalts …");
1520                        // get_future_salts#b921bd04 num:int = FutureSalts
1521                        let mut req_body = Vec::with_capacity(8);
1522                        req_body.extend_from_slice(&0xb921bd04_u32.to_le_bytes());
1523                        req_body.extend_from_slice(&64_i32.to_le_bytes());
1524                        // G-09 fix: store in sent_bodies so bad_msg_notify can re-send it.
1525                        let (wire, fs_msg_id) = {
1526                            let mut w = inner.writer.lock().await;
1527                            let (wire, id) = w.enc.pack_body_with_msg_id(&req_body, true);
1528                            w.sent_bodies.insert(id, req_body);
1529                            (wire, id)
1530                        };
1531                        let fk = inner.writer.lock().await.frame_kind.clone();
1532                        let (tx, rx) = tokio::sync::oneshot::channel();
1533                        inner.pending.lock().await.insert(fs_msg_id, tx);
1534                        {
1535                            let mut w = inner.writer.lock().await;
1536                            if send_frame_write(&mut w.write_half, &wire, &fk).await.is_err() {
1537                                inner.pending.lock().await.remove(&fs_msg_id);
1538                                inner.writer.lock().await.sent_bodies.remove(&fs_msg_id);
1539                                return;
1540                            }
1541                        }
1542                        let _ = tokio::time::timeout(
1543                            std::time::Duration::from_secs(30), rx
1544                        ).await;
1545                    });
1546                }
1547            }
1548            ID_PONG => {
1549                // Pong is the server's reply to Ping — NOT inside rpc_result.
1550                // pong#347773c5  msg_id:long  ping_id:long
1551                // body[4..12] = msg_id of the original Ping → key in pending map
1552                if body.len() >= 20 {
1553                    let ping_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1554                    if let Some(tx) = self.inner.pending.lock().await.remove(&ping_msg_id) {
1555                        self.inner.writer.lock().await.sent_bodies.remove(&ping_msg_id);
1556                        let _ = tx.send(Ok(body));
1557                    }
1558                }
1559            }
1560            // ── G-09: FutureSalts arrives bare (like Pong) ───────────────────
1561            ID_FUTURE_SALTS => {
1562                // future_salts#ae500895
1563                //   [0..4]   constructor
1564                //   [4..12]  req_msg_id (long)
1565                //   [12..16] now (int)
1566                //   [16..20] vector constructor 0x1cb5c415
1567                //   [20..24] count (int)          ← was wrongly [16..20]
1568                //   per entry (bare, no constructor):
1569                //     [+0..+4]  valid_since (int)
1570                //     [+4..+8]  valid_until (int)
1571                //     [+8..+16] salt (long)
1572                //   first entry starts at byte 24 → salt at [32..40] ← was [28..36]
1573                if body.len() >= 12 {
1574                    let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1575                    if body.len() >= 40 {
1576                        let count = u32::from_le_bytes(body[20..24].try_into().unwrap()) as usize;
1577                        if count > 0 {
1578                            let salt_val = i64::from_le_bytes(body[32..40].try_into().unwrap());
1579                            self.inner.writer.lock().await.enc.salt = salt_val;
1580                            log::debug!("[layer] G-09 FutureSalts: salt={salt_val:#x} count={count}");
1581                        }
1582                    }
1583                    if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
1584                        self.inner.writer.lock().await.sent_bodies.remove(&req_msg_id);
1585                        let _ = tx.send(Ok(body));
1586                    }
1587                }
1588            }
1589            ID_NEW_SESSION => {
1590                // new_session_created#9ec20908 first_msg_id:long unique_id:long server_salt:long
1591                if body.len() >= 28 {
1592                    let server_salt = i64::from_le_bytes(body[20..28].try_into().unwrap());
1593                    self.inner.writer.lock().await.enc.salt = server_salt;
1594                    log::debug!("[layer] new_session_created — salt reset to {server_salt:#x}");
1595                }
1596            }
1597            // ── G-02 + G-03 + G-12: bad_msg_notification ────────────────────
1598            ID_BAD_MSG_NOTIFY => {
1599                // bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int error_code:int
1600                if body.len() < 20 { return; }
1601                let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1602                let error_code = u32::from_le_bytes(body[16..20].try_into().unwrap());
1603                log::warn!("[layer] bad_msg_notification: msg_id={bad_msg_id} code={error_code}");
1604
1605                // Phase 1: hold writer only for enc-state mutations + packing.
1606                // The lock is dropped BEFORE we touch `pending`, eliminating the
1607                // writer→pending lock-order deadlock that existed before this fix.
1608                let resend: Option<(Vec<u8>, i64, FrameKind)> = {
1609                    let mut w = self.inner.writer.lock().await;
1610                    // G-12: correct clock skew on codes 16/17
1611                    if error_code == 16 || error_code == 17 {
1612                        w.enc.correct_time_offset(bad_msg_id);
1613                    }
1614                    // G-03: correct seq_no on codes 32/33
1615                    if error_code == 32 || error_code == 33 {
1616                        w.enc.correct_seq_no(error_code);
1617                    }
1618                    // G-02: pack re-send if we have the original body.
1619                    if let Some(orig_body) = w.sent_bodies.remove(&bad_msg_id) {
1620                        let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
1621                        let fk = w.frame_kind.clone();
1622                        w.sent_bodies.insert(new_msg_id, orig_body);
1623                        Some((wire, new_msg_id, fk))
1624                    } else {
1625                        None
1626                    }
1627                }; // ← writer lock released here
1628
1629                match resend {
1630                    Some((wire, new_msg_id, fk)) => {
1631                        // Phase 2: re-key pending (no writer lock held).
1632                        let has_waiter = {
1633                            let mut pending = self.inner.pending.lock().await;
1634                            if let Some(tx) = pending.remove(&bad_msg_id) {
1635                                pending.insert(new_msg_id, tx);
1636                                true
1637                            } else {
1638                                false
1639                            }
1640                        };
1641                        if has_waiter {
1642                            // Phase 3: re-acquire writer only for TCP send.
1643                            let mut w = self.inner.writer.lock().await;
1644                            if let Err(e) = send_frame_write(&mut w.write_half, &wire, &fk).await {
1645                                log::warn!("[layer] G-02 re-send failed: {e}");
1646                                w.sent_bodies.remove(&new_msg_id);
1647                            } else {
1648                                log::debug!("[layer] G-02 re-sent {bad_msg_id}→{new_msg_id}");
1649                            }
1650                        } else {
1651                            self.inner.writer.lock().await.sent_bodies.remove(&new_msg_id);
1652                        }
1653                    }
1654                    None => {
1655                        // Not in sent_bodies — surface retriable error to the waiter.
1656                        if let Some(tx) = self.inner.pending.lock().await.remove(&bad_msg_id) {
1657                            let _ = tx.send(Err(InvocationError::Deserialize(
1658                                format!("bad_msg_notification code={error_code}")
1659                            )));
1660                        }
1661                    }
1662                }
1663            }
1664            // ── G-11: MsgDetailedInfo → ack the answer_msg_id ───────────────
1665            ID_MSG_DETAILED_INFO => {
1666                // msg_detailed_info#276d3ec6 msg_id:long answer_msg_id:long bytes:int status:int
1667                // body[4..12]  = msg_id (original request)
1668                // body[12..20] = answer_msg_id (what to ack)
1669                if body.len() >= 20 {
1670                    let answer_msg_id = i64::from_le_bytes(body[12..20].try_into().unwrap());
1671                    self.inner.writer.lock().await.pending_ack.push(answer_msg_id);
1672                    log::trace!("[layer] G-11 MsgDetailedInfo: queued ack for answer_msg_id={answer_msg_id}");
1673                }
1674            }
1675            ID_MSG_NEW_DETAIL_INFO => {
1676                // msg_new_detailed_info#809db6df answer_msg_id:long bytes:int status:int
1677                // body[4..12] = answer_msg_id
1678                if body.len() >= 12 {
1679                    let answer_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1680                    self.inner.writer.lock().await.pending_ack.push(answer_msg_id);
1681                    log::trace!("[layer] G-11 MsgNewDetailedInfo: queued ack for {answer_msg_id}");
1682                }
1683            }
1684            // ── G-14: MsgResendReq → re-send the requested msg_ids ──────────
1685            ID_MSG_RESEND_REQ => {
1686                // msg_resend_req#7d861a08 msg_ids:Vector<long>
1687                // body[4..8]   = 0x1cb5c415 (Vector constructor)
1688                // body[8..12]  = count
1689                // body[12..]   = msg_ids
1690                if body.len() >= 12 {
1691                    let count = u32::from_le_bytes(body[8..12].try_into().unwrap()) as usize;
1692                    let mut w = self.inner.writer.lock().await;
1693                    let fk = w.frame_kind.clone();
1694                    for i in 0..count {
1695                        let off = 12 + i * 8;
1696                        if off + 8 > body.len() { break; }
1697                        let resend_id = i64::from_le_bytes(body[off..off + 8].try_into().unwrap());
1698                        if let Some(orig_body) = w.sent_bodies.remove(&resend_id) {
1699                            let (wire, new_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
1700                            // Re-key the pending waiter
1701                            let mut pending = self.inner.pending.lock().await;
1702                            if let Some(tx) = pending.remove(&resend_id) {
1703                                pending.insert(new_id, tx);
1704                            }
1705                            drop(pending);
1706                            w.sent_bodies.insert(new_id, orig_body);
1707                            send_frame_write(&mut w.write_half, &wire, &fk).await.ok();
1708                            log::debug!("[layer] G-14 MsgResendReq: resent {resend_id} → {new_id}");
1709                        }
1710                    }
1711                }
1712            }
1713            // ── G-13: log DestroySession outcomes ───────────────────────────
1714            0xe22045fc => {
1715                log::warn!("[layer] destroy_session_ok received — session terminated by server");
1716            }
1717            0x62d350c9 => {
1718                log::warn!("[layer] destroy_session_none received — session was already gone");
1719            }
1720            ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
1721            | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
1722            | ID_UPDATES_TOO_LONG => {
1723                // G-04: ack update frames too
1724                self.inner.writer.lock().await.pending_ack.push(msg_id);
1725                // Bug #1 fix: route through pts/qts/seq gap-checkers
1726                self.dispatch_updates(&body).await;
1727            }
1728            _ => {}
1729        }
1730    }
1731
1732
1733    // ── Bug #1: pts-aware update dispatch ────────────────────────────────────
1734
1735    /// Parse an incoming update container and route each update through the
1736    /// pts/qts/seq gap-checkers before forwarding to `update_tx`.
1737    async fn dispatch_updates(&self, body: &[u8]) {
1738        if body.len() < 4 { return; }
1739        let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
1740
1741        // updatesTooLong — we must call getDifference to recover missed updates.
1742        if cid == 0xe317af7e_u32 {
1743            log::warn!("[layer] updatesTooLong — getDifference");
1744            match self.get_difference().await {
1745                Ok(updates) => { for u in updates { let _ = self.inner.update_tx.send(u); } }
1746                Err(e) => log::warn!("[layer] getDifference after updatesTooLong: {e}"),
1747            }
1748            return;
1749        }
1750
1751        // For updateShortMessage / updateShortChatMessage we use the existing
1752        // high-level parser (they have pts but no bare tl::enums::Update wrapper).
1753        if cid == 0x313bc7f8 || cid == 0x4d6deea5 {
1754            // Parse pts out of the short-message header.
1755            // updateShortMessage: flags(4) id(4) user_id(8) ... pts(?) pts_count(?)
1756            // Easier: use parse_updates which already handles them, then gap-check.
1757            for u in update::parse_updates(body) {
1758                let _ = self.inner.update_tx.send(u);
1759            }
1760            return;
1761        }
1762
1763        // Check outer seq for Updates / UpdatesCombined containers.
1764        {
1765            use layer_tl_types::{Cursor, Deserializable};
1766            let mut cur = Cursor::from_slice(body);
1767            let seq_info: Option<(i32, i32)> = match cid {
1768                0x74ae4240 => {
1769                    // updates#74ae4240
1770                    match tl::enums::Updates::deserialize(&mut cur) {
1771                        Ok(tl::enums::Updates::Updates(u)) => Some((u.seq, u.seq)),
1772                        _ => None,
1773                    }
1774                }
1775                0x725b04c3 => {
1776                    // updatesCombined#725b04c3
1777                    match tl::enums::Updates::deserialize(&mut cur) {
1778                        Ok(tl::enums::Updates::Combined(u)) => Some((u.seq, u.seq_start)),
1779                        _ => None,
1780                    }
1781                }
1782                _ => None,
1783            };
1784            if let Some((seq, seq_start)) = seq_info
1785                && seq != 0 {
1786                match self.check_and_fill_seq_gap(seq, seq_start).await {
1787                    Ok(extra) => { for u in extra { let _ = self.inner.update_tx.send(u); } }
1788                    Err(e) => log::warn!("[layer] seq gap fill: {e}"),
1789                }
1790            }
1791        }
1792
1793        // Flatten the container into bare tl::enums::Update items.
1794        use layer_tl_types::{Cursor, Deserializable};
1795        let mut cur = Cursor::from_slice(body);
1796        let raw: Vec<tl::enums::Update> = match cid {
1797            0x78d4dec1 => { // updateShort
1798                match tl::types::UpdateShort::deserialize(&mut cur) {
1799                    Ok(u) => vec![u.update],
1800                    Err(_) => vec![],
1801                }
1802            }
1803            0x74ae4240 => { // updates
1804                match tl::enums::Updates::deserialize(&mut cur) {
1805                    Ok(tl::enums::Updates::Updates(u)) => u.updates,
1806                    _ => vec![],
1807                }
1808            }
1809            0x725b04c3 => { // updatesCombined
1810                match tl::enums::Updates::deserialize(&mut cur) {
1811                    Ok(tl::enums::Updates::Combined(u)) => u.updates,
1812                    _ => vec![],
1813                }
1814            }
1815            _ => vec![],
1816        };
1817
1818        for upd in raw {
1819            self.dispatch_single_update(upd).await;
1820        }
1821    }
1822
1823    /// Route one bare `tl::enums::Update` through the pts/qts gap-checker,
1824    /// then emit surviving updates to `update_tx`.
1825    async fn dispatch_single_update(&self, upd: tl::enums::Update) {
1826        // Two-phase: inspect pts fields via reference first (all Copy), then
1827        // convert to high-level Update (consumes upd). Avoids borrow-then-move.
1828        enum Kind {
1829            GlobalPts  { pts: i32, pts_count: i32, carry: bool },
1830            ChannelPts { channel_id: i64, pts: i32, pts_count: i32, carry: bool },
1831            Qts        { qts: i32 },
1832            Passthrough,
1833        }
1834
1835        fn ch_from_msg(msg: &tl::enums::Message) -> i64 {
1836            if let tl::enums::Message::Message(m) = msg
1837                && let tl::enums::Peer::Channel(c) = &m.peer_id { return c.channel_id; }
1838            0
1839        }
1840
1841        let kind = {
1842            use tl::enums::Update::*;
1843            match &upd {
1844                NewMessage(u)            => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: true },
1845                EditMessage(u)           => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: true },
1846                DeleteMessages(u)        => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: true },
1847                ReadHistoryInbox(u)      => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: false },
1848                ReadHistoryOutbox(u)     => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: false },
1849                NewChannelMessage(u)     => Kind::ChannelPts { channel_id: ch_from_msg(&u.message), pts: u.pts, pts_count: u.pts_count, carry: true },
1850                EditChannelMessage(u)    => Kind::ChannelPts { channel_id: ch_from_msg(&u.message), pts: u.pts, pts_count: u.pts_count, carry: true },
1851                DeleteChannelMessages(u) => Kind::ChannelPts { channel_id: u.channel_id, pts: u.pts, pts_count: u.pts_count, carry: true },
1852                NewEncryptedMessage(u)   => Kind::Qts { qts: u.qts },
1853                _                        => Kind::Passthrough,
1854            }
1855        };
1856
1857        let high = update::from_single_update_pub(upd);
1858
1859        let to_send: Vec<update::Update> = match kind {
1860            Kind::GlobalPts { pts, pts_count, carry } => {
1861                let first = if carry { high.into_iter().next() } else { None };
1862                match self.check_and_fill_gap(pts, pts_count, first).await {
1863                    Ok(v) => v,
1864                    Err(e) => { log::warn!("[layer] pts gap: {e}"); vec![] }
1865                }
1866            }
1867            Kind::ChannelPts { channel_id, pts, pts_count, carry } => {
1868                let first = if carry { high.into_iter().next() } else { None };
1869                if channel_id != 0 {
1870                    match self.check_and_fill_channel_gap(channel_id, pts, pts_count, first).await {
1871                        Ok(v) => v,
1872                        Err(e) => { log::warn!("[layer] ch pts gap: {e}"); vec![] }
1873                    }
1874                } else {
1875                    first.into_iter().collect()
1876                }
1877            }
1878            Kind::Qts { qts } => {
1879                match self.check_and_fill_qts_gap(qts, 1).await {
1880                    Ok(_) => vec![],
1881                    Err(e) => { log::warn!("[layer] qts gap: {e}"); vec![] }
1882                }
1883            }
1884            Kind::Passthrough => high,
1885        };
1886
1887        for u in to_send {
1888            let _ = self.inner.update_tx.send(u);
1889        }
1890    }
1891
1892    /// Loops with exponential backoff until a TCP+DH reconnect succeeds, then
1893    /// spawns `init_connection` in a background task and returns a oneshot
1894    /// receiver for its result.
1895    ///
1896    /// - `initial_delay_ms = RECONNECT_BASE_MS` for a fresh disconnect.
1897    /// - `initial_delay_ms = 0` when TCP already worked but init failed — we
1898    ///   want to retry init immediately rather than waiting another full backoff.
1899    ///
1900    /// Returns `None` if the shutdown token fires (caller should exit).
1901    async fn do_reconnect_loop(
1902        &self,
1903        initial_delay_ms: u64,
1904        rh:  &mut OwnedReadHalf,
1905        fk:  &mut FrameKind,
1906        ak:  &mut [u8; 256],
1907        sid: &mut i64,
1908        network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
1909    ) -> Option<oneshot::Receiver<Result<(), InvocationError>>> {
1910        let mut delay_ms = if initial_delay_ms == 0 {
1911            // Caller explicitly requests an immediate first attempt (e.g. init
1912            // failed but TCP is up — no reason to wait before the next try).
1913            0
1914        } else {
1915            initial_delay_ms.max(RECONNECT_BASE_MS)
1916        };
1917        loop {
1918            log::info!("[layer] Reconnecting in {delay_ms} ms …");
1919            tokio::select! {
1920                _ = sleep(Duration::from_millis(delay_ms)) => {}
1921                hint = network_hint_rx.recv() => {
1922                    hint?; // shutdown
1923                    log::info!("[layer] Network hint → skipping backoff, reconnecting now");
1924                }
1925            }
1926
1927            match self.do_reconnect(ak, fk).await {
1928                Ok((new_rh, new_fk, new_ak, new_sid)) => {
1929                    *rh = new_rh; *fk = new_fk; *ak = new_ak; *sid = new_sid;
1930                    log::info!("[layer] TCP reconnected ✓ — initialising session …");
1931
1932                    // Spawn init_connection. MUST NOT be awaited inline — the
1933                    // reader loop must resume so it can route the RPC response.
1934                    // We give back a oneshot so the reader can act on failure.
1935                    let (init_tx, init_rx) = oneshot::channel();
1936                    let c   = self.clone();
1937                    let utx = self.inner.update_tx.clone();
1938                    tokio::spawn(async move {
1939                        // Respect FLOOD_WAIT before sending the result back.
1940                        // Without this, a FLOOD_WAIT from Telegram during init
1941                        // would immediately re-trigger another reconnect attempt,
1942                        // which would itself hit FLOOD_WAIT — a ban spiral.
1943                        let result = loop {
1944                            match c.init_connection().await {
1945                                Ok(()) => break Ok(()),
1946                                Err(InvocationError::Rpc(ref r))
1947                                    if r.flood_wait_seconds().is_some() =>
1948                                {
1949                                    let secs = r.flood_wait_seconds().unwrap();
1950                                    log::warn!(
1951                                        "[layer] init_connection FLOOD_WAIT_{secs} —                                          waiting before retry"
1952                                    );
1953                                    sleep(Duration::from_secs(secs + 1)).await;
1954                                    // loop and retry init_connection
1955                                }
1956                                Err(e) => break Err(e),
1957                            }
1958                        };
1959                        if result.is_ok() {
1960                            // Replay any updates missed during the outage.
1961                            if let Ok(missed) = c.get_difference().await {
1962                                for u in missed { let _ = utx.send(u); }
1963                            }
1964                        }
1965                        let _ = init_tx.send(result);
1966                    });
1967                    return Some(init_rx);
1968                }
1969                Err(e) => {
1970                    log::warn!("[layer] Reconnect attempt failed: {e}");
1971                    // Cap at max, then apply ±20 % jitter to avoid thundering herd.
1972                    // Ensure the delay always advances by at least RECONNECT_BASE_MS
1973                    // so a 0 initial delay on the first attempt doesn't spin-loop.
1974                    let next = delay_ms.saturating_mul(2).clamp(RECONNECT_BASE_MS, RECONNECT_MAX_SECS * 1_000);
1975                    delay_ms = jitter_delay(next).as_millis() as u64;
1976                }
1977            }
1978        }
1979    }
1980
1981    /// Reconnect to the home DC, replace the writer, and return the new read half.
1982    async fn do_reconnect(
1983        &self,
1984        _old_auth_key: &[u8; 256],
1985        _old_frame_kind: &FrameKind,
1986    ) -> Result<(OwnedReadHalf, FrameKind, [u8; 256], i64), InvocationError> {
1987        let home_dc_id = *self.inner.home_dc_id.lock().await;
1988        let (addr, saved_key, first_salt, time_offset) = {
1989            let opts = self.inner.dc_options.lock().await;
1990            match opts.get(&home_dc_id) {
1991                Some(e) => (e.addr.clone(), e.auth_key, e.first_salt, e.time_offset),
1992                None    => ("149.154.167.51:443".to_string(), None, 0, 0),
1993            }
1994        };
1995        let socks5    = self.inner.socks5.clone();
1996        let transport = self.inner.transport.clone();
1997
1998        let new_conn = if let Some(key) = saved_key {
1999            log::info!("[layer] Reconnecting to DC{home_dc_id} with saved key …");
2000            match Connection::connect_with_key(
2001                &addr, key, first_salt, time_offset, socks5.as_ref(), &transport,
2002            ).await {
2003                Ok(c)   => c,
2004                Err(e2) => {
2005                    log::warn!("[layer] connect_with_key failed ({e2}), fresh DH …");
2006                    Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
2007                }
2008            }
2009        } else {
2010            Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
2011        };
2012
2013        let (new_writer, new_read, new_fk) = new_conn.into_writer();
2014        let new_ak  = new_writer.enc.auth_key_bytes();
2015        let new_sid = new_writer.enc.session_id();
2016        *self.inner.writer.lock().await = new_writer;
2017
2018        // NOTE: init_connection() is intentionally NOT called here.
2019        //
2020        // do_reconnect() is always called from inside the reader loop's select!,
2021        // which means the reader task is blocked while this function runs.
2022        // init_connection() sends an RPC and awaits the response — but only the
2023        // reader task can route that response back to the pending caller.
2024        // Calling it here creates a self-deadlock that times out after 30 s.
2025        //
2026        // Instead, callers are responsible for spawning init_connection() in a
2027        // separate task AFTER the reader loop has resumed and can process frames.
2028
2029        Ok((new_read, new_fk, new_ak, new_sid))
2030    }
2031
2032    // ── Messaging ──────────────────────────────────────────────────────────
2033
2034    /// Send a text message. Use `"me"` for Saved Messages.
2035    pub async fn send_message(&self, peer: &str, text: &str) -> Result<(), InvocationError> {
2036        let p = self.resolve_peer(peer).await?;
2037        self.send_message_to_peer(p, text).await
2038    }
2039
2040    /// Send a message to an already-resolved peer (plain text shorthand).
2041    pub async fn send_message_to_peer(
2042        &self,
2043        peer: tl::enums::Peer,
2044        text: &str,
2045    ) -> Result<(), InvocationError> {
2046        self.send_message_to_peer_ex(peer, &InputMessage::text(text)).await
2047    }
2048
2049    /// Send a message with full [`InputMessage`] options.
2050    pub async fn send_message_to_peer_ex(
2051        &self,
2052        peer: tl::enums::Peer,
2053        msg:  &InputMessage,
2054    ) -> Result<(), InvocationError> {
2055        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2056        let schedule = if msg.schedule_once_online {
2057            Some(0x7FFF_FFFEi32)
2058        } else {
2059            msg.schedule_date
2060        };
2061
2062        // G-45: if media is attached, route through SendMedia instead of SendMessage.
2063        if let Some(media) = &msg.media {
2064            let req = tl::functions::messages::SendMedia {
2065                silent:                   msg.silent,
2066                background:               msg.background,
2067                clear_draft:              msg.clear_draft,
2068                noforwards:               false,
2069                update_stickersets_order: false,
2070                invert_media:             msg.invert_media,
2071                allow_paid_floodskip:     false,
2072                peer:                     input_peer,
2073                reply_to:                 msg.reply_header(),
2074                media:                    media.clone(),
2075                message:                  msg.text.clone(),
2076                random_id:                random_i64(),
2077                reply_markup:             msg.reply_markup.clone(),
2078                entities:                 msg.entities.clone(),
2079                schedule_date:            schedule,
2080                schedule_repeat_period:   None,
2081                send_as:                  None,
2082                quick_reply_shortcut:     None,
2083                effect:                   None,
2084                allow_paid_stars:         None,
2085                suggested_post:           None,
2086            };
2087            return self.rpc_call_raw_pub(&req).await.map(|_| ());
2088        }
2089
2090        let req = tl::functions::messages::SendMessage {
2091            no_webpage:               msg.no_webpage,
2092            silent:                   msg.silent,
2093            background:               msg.background,
2094            clear_draft:              msg.clear_draft,
2095            noforwards:               false,
2096            update_stickersets_order: false,
2097            invert_media:             msg.invert_media,
2098            allow_paid_floodskip:     false,
2099            peer:                     input_peer,
2100            reply_to:                 msg.reply_header(),
2101            message:                  msg.text.clone(),
2102            random_id:                random_i64(),
2103            reply_markup:             msg.reply_markup.clone(),
2104            entities:                 msg.entities.clone(),
2105            schedule_date:            schedule,
2106            schedule_repeat_period:   None,
2107            send_as:                  None,
2108            quick_reply_shortcut:     None,
2109            effect:                   None,
2110            allow_paid_stars:         None,
2111            suggested_post:           None,
2112        };
2113        self.rpc_write(&req).await
2114    }
2115
2116    /// Send directly to Saved Messages.
2117    pub async fn send_to_self(&self, text: &str) -> Result<(), InvocationError> {
2118        let req = tl::functions::messages::SendMessage {
2119            no_webpage:               false,
2120            silent:                   false,
2121            background:               false,
2122            clear_draft:              false,
2123            noforwards:               false,
2124            update_stickersets_order: false,
2125            invert_media:             false,
2126            allow_paid_floodskip:     false,
2127            peer:                     tl::enums::InputPeer::PeerSelf,
2128            reply_to:                 None,
2129            message:                  text.to_string(),
2130            random_id:                random_i64(),
2131            reply_markup:             None,
2132            entities:                 None,
2133            schedule_date:            None,
2134            schedule_repeat_period:   None,
2135            send_as:                  None,
2136            quick_reply_shortcut:     None,
2137            effect:                   None,
2138            allow_paid_stars:         None,
2139            suggested_post:           None,
2140        };
2141        self.rpc_write(&req).await
2142    }
2143
2144    /// Edit an existing message.
2145    pub async fn edit_message(
2146        &self,
2147        peer:       tl::enums::Peer,
2148        message_id: i32,
2149        new_text:   &str,
2150    ) -> Result<(), InvocationError> {
2151        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2152        let req = tl::functions::messages::EditMessage {
2153            no_webpage:    false,
2154            invert_media:  false,
2155            peer:          input_peer,
2156            id:            message_id,
2157            message:       Some(new_text.to_string()),
2158            media:         None,
2159            reply_markup:  None,
2160            entities:      None,
2161            schedule_date: None,
2162            schedule_repeat_period: None,
2163            quick_reply_shortcut_id: None,
2164        };
2165        self.rpc_write(&req).await
2166    }
2167
2168    /// Forward messages from `source` to `destination`.
2169    pub async fn forward_messages(
2170        &self,
2171        destination: tl::enums::Peer,
2172        message_ids: &[i32],
2173        source:      tl::enums::Peer,
2174    ) -> Result<(), InvocationError> {
2175        let cache = self.inner.peer_cache.lock().await;
2176        let to_peer   = cache.peer_to_input(&destination);
2177        let from_peer = cache.peer_to_input(&source);
2178        drop(cache);
2179
2180        let req = tl::functions::messages::ForwardMessages {
2181            silent:            false,
2182            background:        false,
2183            with_my_score:     false,
2184            drop_author:       false,
2185            drop_media_captions: false,
2186            noforwards:        false,
2187            from_peer,
2188            id:                message_ids.to_vec(),
2189            random_id:         (0..message_ids.len()).map(|_| random_i64()).collect(),
2190            to_peer,
2191            top_msg_id:        None,
2192            reply_to:          None,
2193            schedule_date:     None,
2194            schedule_repeat_period: None,
2195            send_as:           None,
2196            quick_reply_shortcut: None,
2197            effect:            None,
2198            video_timestamp:   None,
2199            allow_paid_stars:  None,
2200            allow_paid_floodskip: false,
2201            suggested_post:    None,
2202        };
2203        self.rpc_write(&req).await
2204    }
2205
2206    /// Delete messages by ID.
2207    pub async fn delete_messages(&self, message_ids: Vec<i32>, revoke: bool) -> Result<(), InvocationError> {
2208        let req = tl::functions::messages::DeleteMessages { revoke, id: message_ids };
2209        self.rpc_write(&req).await
2210    }
2211
2212    /// Get messages by their IDs from a peer.
2213    pub async fn get_messages_by_id(
2214        &self,
2215        peer: tl::enums::Peer,
2216        ids:  &[i32],
2217    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2218        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2219        let id_list: Vec<tl::enums::InputMessage> = ids.iter()
2220            .map(|&id| tl::enums::InputMessage::Id(tl::types::InputMessageId { id }))
2221            .collect();
2222        let req  = tl::functions::channels::GetMessages {
2223            channel: match &input_peer {
2224                tl::enums::InputPeer::Channel(c) =>
2225                    tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2226                        channel_id: c.channel_id, access_hash: c.access_hash
2227                    }),
2228                _ => return self.get_messages_user(input_peer, id_list).await,
2229            },
2230            id: id_list,
2231        };
2232        let body    = self.rpc_call_raw(&req).await?;
2233        let mut cur = Cursor::from_slice(&body);
2234        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2235            tl::enums::messages::Messages::Messages(m) => m.messages,
2236            tl::enums::messages::Messages::Slice(m)    => m.messages,
2237            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2238            tl::enums::messages::Messages::NotModified(_) => vec![],
2239        };
2240        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2241    }
2242
2243    async fn get_messages_user(
2244        &self,
2245        _peer: tl::enums::InputPeer,
2246        ids:   Vec<tl::enums::InputMessage>,
2247    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2248        let req = tl::functions::messages::GetMessages { id: ids };
2249        let body    = self.rpc_call_raw(&req).await?;
2250        let mut cur = Cursor::from_slice(&body);
2251        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2252            tl::enums::messages::Messages::Messages(m) => m.messages,
2253            tl::enums::messages::Messages::Slice(m)    => m.messages,
2254            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2255            tl::enums::messages::Messages::NotModified(_) => vec![],
2256        };
2257        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2258    }
2259
2260    /// Get the pinned message in a chat.
2261    pub async fn get_pinned_message(
2262        &self,
2263        peer: tl::enums::Peer,
2264    ) -> Result<Option<update::IncomingMessage>, InvocationError> {
2265        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2266        let req = tl::functions::messages::Search {
2267            peer: input_peer,
2268            q: String::new(),
2269            from_id: None,
2270            saved_peer_id: None,
2271            saved_reaction: None,
2272            top_msg_id: None,
2273            filter: tl::enums::MessagesFilter::InputMessagesFilterPinned,
2274            min_date: 0,
2275            max_date: 0,
2276            offset_id: 0,
2277            add_offset: 0,
2278            limit: 1,
2279            max_id: 0,
2280            min_id: 0,
2281            hash: 0,
2282        };
2283        let body    = self.rpc_call_raw(&req).await?;
2284        let mut cur = Cursor::from_slice(&body);
2285        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2286            tl::enums::messages::Messages::Messages(m) => m.messages,
2287            tl::enums::messages::Messages::Slice(m)    => m.messages,
2288            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2289            tl::enums::messages::Messages::NotModified(_) => vec![],
2290        };
2291        Ok(msgs.into_iter().next().map(update::IncomingMessage::from_raw))
2292    }
2293
2294    /// Pin a message in a chat.
2295    pub async fn pin_message(
2296        &self,
2297        peer:       tl::enums::Peer,
2298        message_id: i32,
2299        silent:     bool,
2300        unpin:      bool,
2301        pm_oneside: bool,
2302    ) -> Result<(), InvocationError> {
2303        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2304        let req = tl::functions::messages::UpdatePinnedMessage {
2305            silent,
2306            unpin,
2307            pm_oneside,
2308            peer: input_peer,
2309            id:   message_id,
2310        };
2311        self.rpc_write(&req).await
2312    }
2313
2314    /// Unpin a specific message.
2315    pub async fn unpin_message(
2316        &self,
2317        peer:       tl::enums::Peer,
2318        message_id: i32,
2319    ) -> Result<(), InvocationError> {
2320        self.pin_message(peer, message_id, true, true, false).await
2321    }
2322
2323    /// Unpin all messages in a chat.
2324    pub async fn unpin_all_messages(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2325        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2326        let req = tl::functions::messages::UnpinAllMessages {
2327            peer:      input_peer,
2328            top_msg_id: None,
2329            saved_peer_id: None,
2330        };
2331        self.rpc_write(&req).await
2332    }
2333
2334    // ── Message search ─────────────────────────────────────────────────────
2335
2336    /// Search messages in a chat (simple form).
2337    /// For advanced filtering use [`Client::search`] → [`SearchBuilder`].
2338    pub async fn search_messages(
2339        &self,
2340        peer:  tl::enums::Peer,
2341        query: &str,
2342        limit: i32,
2343    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2344        self.search(peer, query).limit(limit).fetch(self).await
2345    }
2346
2347    /// G-31: Fluent search builder for in-chat message search.
2348    pub fn search(&self, peer: tl::enums::Peer, query: &str) -> SearchBuilder {
2349        SearchBuilder::new(peer, query.to_string())
2350    }
2351
2352    /// Search globally (simple form). For filtering use [`Client::search_global_builder`].
2353    pub async fn search_global(
2354        &self,
2355        query: &str,
2356        limit: i32,
2357    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2358        self.search_global_builder(query).limit(limit).fetch(self).await
2359    }
2360
2361    /// G-32: Fluent builder for global cross-chat search.
2362    pub fn search_global_builder(&self, query: &str) -> GlobalSearchBuilder {
2363        GlobalSearchBuilder::new(query.to_string())
2364    }
2365
2366    // ── Scheduled messages ─────────────────────────────────────────────────
2367
2368    /// Retrieve all scheduled messages in a chat.
2369    ///
2370    /// Scheduled messages are messages set to be sent at a future time using
2371    /// [`InputMessage::schedule_date`].  Returns them newest-first.
2372    ///
2373    /// # Example
2374    /// ```rust,no_run
2375    /// # async fn f(client: layer_client::Client, peer: layer_tl_types::enums::Peer) -> Result<(), Box<dyn std::error::Error>> {
2376    /// let scheduled = client.get_scheduled_messages(peer).await?;
2377    /// for msg in &scheduled {
2378    ///     println!("Scheduled: {:?} at {:?}", msg.text(), msg.date());
2379    /// }
2380    /// # Ok(()) }
2381    /// ```
2382    pub async fn get_scheduled_messages(
2383        &self,
2384        peer: tl::enums::Peer,
2385    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2386        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2387        let req = tl::functions::messages::GetScheduledHistory {
2388            peer: input_peer,
2389            hash: 0,
2390        };
2391        let body    = self.rpc_call_raw(&req).await?;
2392        let mut cur = Cursor::from_slice(&body);
2393        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2394            tl::enums::messages::Messages::Messages(m)        => m.messages,
2395            tl::enums::messages::Messages::Slice(m)           => m.messages,
2396            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2397            tl::enums::messages::Messages::NotModified(_)     => vec![],
2398        };
2399        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2400    }
2401
2402    /// Delete one or more scheduled messages by their IDs.
2403    pub async fn delete_scheduled_messages(
2404        &self,
2405        peer: tl::enums::Peer,
2406        ids:  Vec<i32>,
2407    ) -> Result<(), InvocationError> {
2408        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2409        let req = tl::functions::messages::DeleteScheduledMessages {
2410            peer: input_peer,
2411            id:   ids,
2412        };
2413        self.rpc_write(&req).await
2414    }
2415
2416    // ── Callback / Inline Queries ──────────────────────────────────────────
2417
2418    /// G-24: Edit an inline message by its [`InputBotInlineMessageId`].
2419    ///
2420    /// Inline messages live on the bot's home DC, not necessarily the current
2421    /// connection's DC.  This method sends the edit RPC on the correct DC by
2422    /// using the DC ID encoded in `msg_id` (high 20 bits of the `dc_id` field).
2423    ///
2424    /// # Example
2425    /// ```rust,no_run
2426    /// # async fn f(
2427    /// #   client: layer_client::Client,
2428    /// #   id: layer_tl_types::enums::InputBotInlineMessageId,
2429    /// # ) -> Result<(), Box<dyn std::error::Error>> {
2430    /// client.edit_inline_message(id, "new text", None).await?;
2431    /// # Ok(()) }
2432    /// ```
2433    pub async fn edit_inline_message(
2434        &self,
2435        id:           tl::enums::InputBotInlineMessageId,
2436        new_text:     &str,
2437        reply_markup: Option<tl::enums::ReplyMarkup>,
2438    ) -> Result<bool, InvocationError> {
2439        let req = tl::functions::messages::EditInlineBotMessage {
2440            no_webpage:   false,
2441            invert_media: false,
2442            id,
2443            message:      Some(new_text.to_string()),
2444            media:        None,
2445            reply_markup,
2446            entities:     None,
2447        };
2448        let body = self.rpc_call_raw(&req).await?;
2449        // Bool#997275b5 = boolTrue; Bool#bc799737 = boolFalse
2450        Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
2451    }
2452
2453    /// Answer a callback query from an inline keyboard button press (bots only).
2454    pub async fn answer_callback_query(
2455        &self,
2456        query_id: i64,
2457        text:     Option<&str>,
2458        alert:    bool,
2459    ) -> Result<bool, InvocationError> {
2460        let req = tl::functions::messages::SetBotCallbackAnswer {
2461            alert,
2462            query_id,
2463            message:    text.map(|s| s.to_string()),
2464            url:        None,
2465            cache_time: 0,
2466        };
2467        let body = self.rpc_call_raw(&req).await?;
2468        Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
2469    }
2470
2471    pub async fn answer_inline_query(
2472        &self,
2473        query_id:    i64,
2474        results:     Vec<tl::enums::InputBotInlineResult>,
2475        cache_time:  i32,
2476        is_personal: bool,
2477        next_offset: Option<String>,
2478    ) -> Result<bool, InvocationError> {
2479        let req = tl::functions::messages::SetInlineBotResults {
2480            gallery:        false,
2481            private:        is_personal,
2482            query_id,
2483            results,
2484            cache_time,
2485            next_offset,
2486            switch_pm:      None,
2487            switch_webview: None,
2488        };
2489        let body = self.rpc_call_raw(&req).await?;
2490        Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
2491    }
2492
2493    // ── Dialogs ────────────────────────────────────────────────────────────
2494
2495    /// Fetch up to `limit` dialogs, most recent first. Populates entity/message.
2496    pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
2497        let req = tl::functions::messages::GetDialogs {
2498            exclude_pinned: false,
2499            folder_id:      None,
2500            offset_date:    0,
2501            offset_id:      0,
2502            offset_peer:    tl::enums::InputPeer::Empty,
2503            limit,
2504            hash:           0,
2505        };
2506
2507        let body    = self.rpc_call_raw(&req).await?;
2508        let mut cur = Cursor::from_slice(&body);
2509        let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2510            tl::enums::messages::Dialogs::Dialogs(d) => d,
2511            tl::enums::messages::Dialogs::Slice(d)   => tl::types::messages::Dialogs {
2512                dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2513            },
2514            tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
2515        };
2516
2517        // Build message map
2518        let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2519            .map(|m| {
2520                let id = match &m {
2521                    tl::enums::Message::Message(x) => x.id,
2522                    tl::enums::Message::Service(x) => x.id,
2523                    tl::enums::Message::Empty(x)   => x.id,
2524                };
2525                (id, m)
2526            })
2527            .collect();
2528
2529        // Build user map
2530        let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2531            .filter_map(|u| {
2532                if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2533            })
2534            .collect();
2535
2536        // Build chat map
2537        let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2538            .filter_map(|c| {
2539                let id = match &c {
2540                    tl::enums::Chat::Chat(x)             => x.id,
2541                    tl::enums::Chat::Forbidden(x)    => x.id,
2542                    tl::enums::Chat::Channel(x)          => x.id,
2543                    tl::enums::Chat::ChannelForbidden(x) => x.id,
2544                    tl::enums::Chat::Empty(x)            => x.id,
2545                };
2546                Some((id, c))
2547            })
2548            .collect();
2549
2550        // Cache peers for future access_hash lookups
2551        {
2552            let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2553            let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2554            self.cache_users_slice(&u_list).await;
2555            self.cache_chats_slice(&c_list).await;
2556        }
2557
2558        let result = raw.dialogs.into_iter().map(|d| {
2559            let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2560            let peer   = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2561
2562            let message = msg_map.get(&top_id).cloned();
2563            let entity = peer.and_then(|p| match p {
2564                tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2565                _ => None,
2566            });
2567            let chat = peer.and_then(|p| match p {
2568                tl::enums::Peer::Chat(c)    => chat_map.get(&c.chat_id).cloned(),
2569                tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2570                _ => None,
2571            });
2572
2573            Dialog { raw: d, message, entity, chat }
2574        }).collect();
2575
2576        Ok(result)
2577    }
2578
2579    /// Internal helper: fetch dialogs with a custom GetDialogs request.
2580    #[allow(dead_code)]
2581    async fn get_dialogs_raw(
2582        &self,
2583        req: tl::functions::messages::GetDialogs,
2584    ) -> Result<Vec<Dialog>, InvocationError> {
2585        let body    = self.rpc_call_raw(&req).await?;
2586        let mut cur = Cursor::from_slice(&body);
2587        let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2588            tl::enums::messages::Dialogs::Dialogs(d) => d,
2589            tl::enums::messages::Dialogs::Slice(d)   => tl::types::messages::Dialogs {
2590                dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2591            },
2592            tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
2593        };
2594
2595        let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2596            .map(|m| {
2597                let id = match &m {
2598                    tl::enums::Message::Message(x) => x.id,
2599                    tl::enums::Message::Service(x) => x.id,
2600                    tl::enums::Message::Empty(x)   => x.id,
2601                };
2602                (id, m)
2603            })
2604            .collect();
2605
2606        let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2607            .filter_map(|u| {
2608                if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2609            })
2610            .collect();
2611
2612        let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2613            .filter_map(|c| {
2614                let id = match &c {
2615                    tl::enums::Chat::Chat(x)             => x.id,
2616                    tl::enums::Chat::Forbidden(x)    => x.id,
2617                    tl::enums::Chat::Channel(x)          => x.id,
2618                    tl::enums::Chat::ChannelForbidden(x) => x.id,
2619                    tl::enums::Chat::Empty(x)            => x.id,
2620                };
2621                Some((id, c))
2622            })
2623            .collect();
2624
2625        {
2626            let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2627            let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2628            self.cache_users_slice(&u_list).await;
2629            self.cache_chats_slice(&c_list).await;
2630        }
2631
2632        let result = raw.dialogs.into_iter().map(|d| {
2633            let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2634            let peer   = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2635
2636            let message = msg_map.get(&top_id).cloned();
2637            let entity = peer.and_then(|p| match p {
2638                tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2639                _ => None,
2640            });
2641            let chat = peer.and_then(|p| match p {
2642                tl::enums::Peer::Chat(c)    => chat_map.get(&c.chat_id).cloned(),
2643                tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2644                _ => None,
2645            });
2646
2647            Dialog { raw: d, message, entity, chat }
2648        }).collect();
2649
2650        Ok(result)
2651    }
2652
2653    /// Like `get_dialogs_raw` but also returns the total count from `messages.DialogsSlice`.
2654    async fn get_dialogs_raw_with_count(
2655        &self,
2656        req: tl::functions::messages::GetDialogs,
2657    ) -> Result<(Vec<Dialog>, Option<i32>), InvocationError> {
2658        let body    = self.rpc_call_raw(&req).await?;
2659        let mut cur = Cursor::from_slice(&body);
2660        let (raw, count) = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2661            tl::enums::messages::Dialogs::Dialogs(d) => (d, None),
2662            tl::enums::messages::Dialogs::Slice(d)   => {
2663                let cnt = Some(d.count);
2664                (tl::types::messages::Dialogs {
2665                    dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2666                }, cnt)
2667            }
2668            tl::enums::messages::Dialogs::NotModified(_) => return Ok((vec![], None)),
2669        };
2670
2671        let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2672            .filter_map(|m| {
2673                let id = match &m {
2674                    tl::enums::Message::Message(x) => x.id,
2675                    tl::enums::Message::Service(x) => x.id,
2676                    tl::enums::Message::Empty(x)   => x.id,
2677                };
2678                Some((id, m))
2679            }).collect();
2680
2681        let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2682            .filter_map(|u| {
2683                if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2684            }).collect();
2685
2686        let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2687            .filter_map(|c| {
2688                let id = match &c {
2689                    tl::enums::Chat::Chat(x)             => x.id,
2690                    tl::enums::Chat::Forbidden(x)        => x.id,
2691                    tl::enums::Chat::Channel(x)          => x.id,
2692                    tl::enums::Chat::ChannelForbidden(x) => x.id,
2693                    tl::enums::Chat::Empty(x)            => x.id,
2694                };
2695                Some((id, c))
2696            }).collect();
2697
2698        {
2699            let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2700            let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2701            self.cache_users_slice(&u_list).await;
2702            self.cache_chats_slice(&c_list).await;
2703        }
2704
2705        let result = raw.dialogs.into_iter().map(|d| {
2706            let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2707            let peer   = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2708            let message = msg_map.get(&top_id).cloned();
2709            let entity = peer.and_then(|p| match p {
2710                tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2711                _ => None,
2712            });
2713            let chat = peer.and_then(|p| match p {
2714                tl::enums::Peer::Chat(c)    => chat_map.get(&c.chat_id).cloned(),
2715                tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2716                _ => None,
2717            });
2718            Dialog { raw: d, message, entity, chat }
2719        }).collect();
2720
2721        Ok((result, count))
2722    }
2723
2724    /// Like `get_messages` but also returns the total count from `messages.Slice`.
2725    async fn get_messages_with_count(
2726        &self,
2727        peer:      tl::enums::InputPeer,
2728        limit:     i32,
2729        offset_id: i32,
2730    ) -> Result<(Vec<update::IncomingMessage>, Option<i32>), InvocationError> {
2731        let req = tl::functions::messages::GetHistory {
2732            peer, offset_id, offset_date: 0, add_offset: 0,
2733            limit, max_id: 0, min_id: 0, hash: 0,
2734        };
2735        let body    = self.rpc_call_raw(&req).await?;
2736        let mut cur = Cursor::from_slice(&body);
2737        let (msgs, count) = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2738            tl::enums::messages::Messages::Messages(m) => (m.messages, None),
2739            tl::enums::messages::Messages::Slice(m)    => {
2740                let cnt = Some(m.count);
2741                (m.messages, cnt)
2742            }
2743            tl::enums::messages::Messages::ChannelMessages(m) => (m.messages, Some(m.count)),
2744            tl::enums::messages::Messages::NotModified(_) => (vec![], None),
2745        };
2746        Ok((msgs.into_iter().map(update::IncomingMessage::from_raw).collect(), count))
2747    }
2748
2749    /// Download all bytes of a media attachment and save them to `path`.
2750    ///
2751    /// # Example
2752    /// ```rust,no_run
2753    /// # async fn f(client: layer_client::Client, msg: layer_client::update::IncomingMessage) -> Result<(), Box<dyn std::error::Error>> {
2754    /// if let Some(loc) = msg.download_location() {
2755    ///     client.download_media_to_file(loc, "/tmp/file.jpg").await?;
2756    /// }
2757    /// # Ok(()) }
2758    /// ```
2759    pub async fn download_media_to_file(
2760        &self,
2761        location: tl::enums::InputFileLocation,
2762        path:     impl AsRef<std::path::Path>,
2763    ) -> Result<(), InvocationError> {
2764        let bytes = self.download_media(location).await?;
2765        std::fs::write(path, &bytes).map_err(InvocationError::Io)?;
2766        Ok(())
2767    }
2768
2769    pub async fn delete_dialog(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2770        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2771        let req = tl::functions::messages::DeleteHistory {
2772            just_clear:  false,
2773            revoke:      false,
2774            peer:        input_peer,
2775            max_id:      0,
2776            min_date:    None,
2777            max_date:    None,
2778        };
2779        self.rpc_write(&req).await
2780    }
2781
2782    /// Mark all messages in a chat as read.
2783    pub async fn mark_as_read(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2784        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2785        match &input_peer {
2786            tl::enums::InputPeer::Channel(c) => {
2787                let req = tl::functions::channels::ReadHistory {
2788                    channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2789                        channel_id: c.channel_id, access_hash: c.access_hash,
2790                    }),
2791                    max_id: 0,
2792                };
2793                self.rpc_call_raw(&req).await?;
2794            }
2795            _ => {
2796                let req = tl::functions::messages::ReadHistory { peer: input_peer, max_id: 0 };
2797                self.rpc_call_raw(&req).await?;
2798            }
2799        }
2800        Ok(())
2801    }
2802
2803    /// Clear unread mention markers.
2804    pub async fn clear_mentions(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2805        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2806        let req = tl::functions::messages::ReadMentions { peer: input_peer, top_msg_id: None };
2807        self.rpc_write(&req).await
2808    }
2809
2810    // ── Chat actions (typing, etc) ─────────────────────────────────────────
2811
2812    /// Send a chat action (typing indicator, uploading photo, etc).
2813    ///
2814    /// For "typing" use `tl::enums::SendMessageAction::Typing`.
2815    /// For forum topic support use [`send_chat_action_ex`](Self::send_chat_action_ex)
2816    /// or the [`typing_in_topic`](Self::typing_in_topic) helper.
2817    pub async fn send_chat_action(
2818        &self,
2819        peer:   tl::enums::Peer,
2820        action: tl::enums::SendMessageAction,
2821    ) -> Result<(), InvocationError> {
2822        self.send_chat_action_ex(peer, action, None).await
2823    }
2824
2825    // ── Join / invite links ────────────────────────────────────────────────
2826
2827    /// Join a public chat or channel by username/peer.
2828    pub async fn join_chat(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2829        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2830        match input_peer {
2831            tl::enums::InputPeer::Channel(c) => {
2832                let req = tl::functions::channels::JoinChannel {
2833                    channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2834                        channel_id: c.channel_id, access_hash: c.access_hash,
2835                    }),
2836                };
2837                self.rpc_call_raw(&req).await?;
2838            }
2839            tl::enums::InputPeer::Chat(c) => {
2840                let req = tl::functions::messages::AddChatUser {
2841                    chat_id:  c.chat_id,
2842                    user_id:  tl::enums::InputUser::UserSelf,
2843                    fwd_limit: 0,
2844                };
2845                self.rpc_call_raw(&req).await?;
2846            }
2847            _ => return Err(InvocationError::Deserialize("cannot join this peer type".into())),
2848        }
2849        Ok(())
2850    }
2851
2852    /// Accept and join via an invite link.
2853    pub async fn accept_invite_link(&self, link: &str) -> Result<(), InvocationError> {
2854        let hash = Self::parse_invite_hash(link)
2855            .ok_or_else(|| InvocationError::Deserialize(format!("invalid invite link: {link}")))?;
2856        let req = tl::functions::messages::ImportChatInvite { hash: hash.to_string() };
2857        self.rpc_write(&req).await
2858    }
2859
2860    /// Extract hash from `https://t.me/+HASH` or `https://t.me/joinchat/HASH`.
2861    pub fn parse_invite_hash(link: &str) -> Option<&str> {
2862        if let Some(pos) = link.find("/+") {
2863            return Some(&link[pos + 2..]);
2864        }
2865        if let Some(pos) = link.find("/joinchat/") {
2866            return Some(&link[pos + 10..]);
2867        }
2868        None
2869    }
2870
2871    // ── Message history (paginated) ────────────────────────────────────────
2872
2873    /// Fetch a page of messages from a peer's history.
2874    pub async fn get_messages(
2875        &self,
2876        peer:      tl::enums::InputPeer,
2877        limit:     i32,
2878        offset_id: i32,
2879    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2880        let req = tl::functions::messages::GetHistory {
2881            peer, offset_id, offset_date: 0, add_offset: 0,
2882            limit, max_id: 0, min_id: 0, hash: 0,
2883        };
2884        let body    = self.rpc_call_raw(&req).await?;
2885        let mut cur = Cursor::from_slice(&body);
2886        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2887            tl::enums::messages::Messages::Messages(m) => m.messages,
2888            tl::enums::messages::Messages::Slice(m)    => m.messages,
2889            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2890            tl::enums::messages::Messages::NotModified(_) => vec![],
2891        };
2892        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2893    }
2894
2895    // ── Peer resolution ────────────────────────────────────────────────────
2896
2897    /// Resolve a peer string to a [`tl::enums::Peer`].
2898    pub async fn resolve_peer(
2899        &self,
2900        peer: &str,
2901    ) -> Result<tl::enums::Peer, InvocationError> {
2902        match peer.trim() {
2903            "me" | "self" => Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 })),
2904            username if username.starts_with('@') => {
2905                self.resolve_username(&username[1..]).await
2906            }
2907            id_str => {
2908                if let Ok(id) = id_str.parse::<i64>() {
2909                    Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: id }))
2910                } else {
2911                    Err(InvocationError::Deserialize(format!("cannot resolve peer: {peer}")))
2912                }
2913            }
2914        }
2915    }
2916
2917    /// Resolve a Telegram username to a [`tl::enums::Peer`] and cache the access hash.
2918    ///
2919    /// Also accepts usernames without the leading `@`.
2920    pub async fn resolve_username(&self, username: &str) -> Result<tl::enums::Peer, InvocationError> {
2921        let req  = tl::functions::contacts::ResolveUsername {
2922            username: username.to_string(), referer: None,
2923        };
2924        let body = self.rpc_call_raw(&req).await?;
2925        let mut cur = Cursor::from_slice(&body);
2926        let resolved = match tl::enums::contacts::ResolvedPeer::deserialize(&mut cur)? {
2927            tl::enums::contacts::ResolvedPeer::ResolvedPeer(r) => r,
2928        };
2929        // Cache users and chats from the resolution
2930        self.cache_users_slice(&resolved.users).await;
2931        self.cache_chats_slice(&resolved.chats).await;
2932        Ok(resolved.peer)
2933    }
2934
2935    // ── Raw invoke ─────────────────────────────────────────────────────────
2936
2937    /// Invoke any TL function directly, handling flood-wait retries.
2938    pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
2939        let body = self.rpc_call_raw(req).await?;
2940        let mut cur = Cursor::from_slice(&body);
2941        R::Return::deserialize(&mut cur).map_err(Into::into)
2942    }
2943
2944    async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
2945        let mut fail_count   = NonZeroU32::new(1).unwrap();
2946        let mut slept_so_far = Duration::default();
2947        loop {
2948            match self.do_rpc_call(req).await {
2949                Ok(body) => return Ok(body),
2950                Err(e) => {
2951                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
2952                    match self.inner.retry_policy.should_retry(&ctx) {
2953                        ControlFlow::Continue(delay) => {
2954                            sleep(delay).await;
2955                            slept_so_far += delay;
2956                            fail_count = fail_count.saturating_add(1);
2957                        }
2958                        ControlFlow::Break(()) => return Err(ctx.error),
2959                    }
2960                }
2961            }
2962        }
2963    }
2964
2965    /// Send an RPC call and await the response via a oneshot channel.
2966    ///
2967    /// This is the core of the split-stream design:
2968    ///  1. Pack the request and get its msg_id.
2969    ///  2. Register a oneshot Sender in the pending map (BEFORE sending).
2970    ///  3. Send the frame while holding the writer lock.
2971    ///  4. Release the writer lock immediately — the reader task now runs freely.
2972    ///  5. Await the oneshot Receiver; the reader task will fulfill it when
2973    ///     the matching rpc_result frame arrives.
2974    async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
2975        let (tx, rx) = oneshot::channel();
2976        {
2977            let raw_body = req.to_bytes();
2978            // G-06: compress large outgoing bodies
2979            let body = maybe_gz_pack(&raw_body);
2980
2981            let mut w = self.inner.writer.lock().await;
2982            let fk = w.frame_kind.clone();
2983
2984            // G-04 + G-07: drain any pending acks; if non-empty bundle them with
2985            // the request in a MessageContainer so acks piggyback on every send.
2986            let acks: Vec<i64> = w.pending_ack.drain(..).collect();
2987
2988            if acks.is_empty() {
2989                // ── Simple path: standalone request ──────────────────────────
2990                let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
2991                w.sent_bodies.insert(msg_id, body); // G-02
2992                self.inner.pending.lock().await.insert(msg_id, tx);
2993                send_frame_write(&mut w.write_half, &wire, &fk).await?;
2994            } else {
2995                // ── G-07 container path: [MsgsAck, request] ─────────────────
2996                // Build MsgsAck inner body
2997                let ack_body = build_msgs_ack_body(&acks);
2998                // Allocate inner msg_id+seqno for each item
2999                let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false); // non-content
3000                let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);  // content
3001
3002                // Build container payload
3003                let container_payload = build_container_body(&[
3004                    (ack_msg_id, ack_seqno, ack_body.as_slice()),
3005                    (req_msg_id, req_seqno, body.as_slice()),
3006                ]);
3007
3008                // Encrypt the container as a non-content-related outer message
3009                let wire = w.enc.pack_container(&container_payload);
3010
3011                w.sent_bodies.insert(req_msg_id, body); // G-02
3012                self.inner.pending.lock().await.insert(req_msg_id, tx);
3013                send_frame_write(&mut w.write_half, &wire, &fk).await?;
3014                log::trace!("[layer] G-07 container: bundled {} acks + request", acks.len());
3015            }
3016        }
3017        match tokio::time::timeout(Duration::from_secs(30), rx).await {
3018            Ok(Ok(result)) => result,
3019            Ok(Err(_))     => Err(InvocationError::Deserialize("RPC channel closed (reader died?)".into())),
3020            Err(_)         => Err(InvocationError::Deserialize("RPC timed out after 30 s".into())),
3021        }
3022    }
3023
3024    /// Like `rpc_call_raw` but for write RPCs (Serializable, return type is Updates).
3025    /// Uses the same oneshot mechanism — the reader task signals success/failure.
3026    async fn rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
3027        let mut fail_count   = NonZeroU32::new(1).unwrap();
3028        let mut slept_so_far = Duration::default();
3029        loop {
3030            let result = self.do_rpc_write(req).await;
3031            match result {
3032                Ok(()) => return Ok(()),
3033                Err(e) => {
3034                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
3035                    match self.inner.retry_policy.should_retry(&ctx) {
3036                        ControlFlow::Continue(delay) => {
3037                            sleep(delay).await;
3038                            slept_so_far += delay;
3039                            fail_count = fail_count.saturating_add(1);
3040                        }
3041                        ControlFlow::Break(()) => return Err(ctx.error),
3042                    }
3043                }
3044            }
3045        }
3046    }
3047
3048    async fn do_rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
3049        let (tx, rx) = oneshot::channel();
3050        {
3051            let raw_body = req.to_bytes();
3052            // G-06: compress large outgoing bodies
3053            let body = maybe_gz_pack(&raw_body);
3054
3055            let mut w = self.inner.writer.lock().await;
3056            let fk = w.frame_kind.clone();
3057
3058            // G-04 + G-07: drain pending acks and bundle into container if any
3059            let acks: Vec<i64> = w.pending_ack.drain(..).collect();
3060
3061            if acks.is_empty() {
3062                let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
3063                w.sent_bodies.insert(msg_id, body); // G-02
3064                self.inner.pending.lock().await.insert(msg_id, tx);
3065                send_frame_write(&mut w.write_half, &wire, &fk).await?;
3066            } else {
3067                let ack_body = build_msgs_ack_body(&acks);
3068                let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
3069                let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);
3070                let container_payload = build_container_body(&[
3071                    (ack_msg_id, ack_seqno, ack_body.as_slice()),
3072                    (req_msg_id, req_seqno, body.as_slice()),
3073                ]);
3074                let wire = w.enc.pack_container(&container_payload);
3075                w.sent_bodies.insert(req_msg_id, body); // G-02
3076                self.inner.pending.lock().await.insert(req_msg_id, tx);
3077                send_frame_write(&mut w.write_half, &wire, &fk).await?;
3078                log::trace!("[layer] G-07 write container: bundled {} acks + write", acks.len());
3079            }
3080        }
3081        match tokio::time::timeout(Duration::from_secs(30), rx).await {
3082            Ok(Ok(result)) => result.map(|_| ()),
3083            Ok(Err(_))     => Err(InvocationError::Deserialize("rpc_write channel closed".into())),
3084            Err(_)         => Err(InvocationError::Deserialize("rpc_write timed out after 30 s".into())),
3085        }
3086    }
3087
3088    // ── initConnection ─────────────────────────────────────────────────────
3089
3090    async fn init_connection(&self) -> Result<(), InvocationError> {
3091        use tl::functions::{InvokeWithLayer, InitConnection, help::GetConfig};
3092        let req = InvokeWithLayer {
3093            layer: tl::LAYER,
3094            query: InitConnection {
3095                api_id:           self.inner.api_id,
3096                device_model:     "Linux".to_string(),
3097                system_version:   "1.0".to_string(),
3098                app_version:      env!("CARGO_PKG_VERSION").to_string(),
3099                system_lang_code: "en".to_string(),
3100                lang_pack:        "".to_string(),
3101                lang_code:        "en".to_string(),
3102                proxy:            None,
3103                params:           None,
3104                query:            GetConfig {},
3105            },
3106        };
3107
3108        // Use the split-writer oneshot path (reader task routes the response).
3109        let body = self.rpc_call_raw_serializable(&req).await?;
3110
3111        let mut cur = Cursor::from_slice(&body);
3112        if let Ok(tl::enums::Config::Config(cfg)) = tl::enums::Config::deserialize(&mut cur) {
3113            let allow_ipv6 = self.inner.allow_ipv6;
3114            let mut opts = self.inner.dc_options.lock().await;
3115            for opt in &cfg.dc_options {
3116                let tl::enums::DcOption::DcOption(o) = opt;
3117                if o.media_only || o.cdn || o.tcpo_only { continue; }
3118                if o.ipv6 && !allow_ipv6 { continue; }
3119                let addr = format!("{}:{}", o.ip_address, o.port);
3120                let entry = opts.entry(o.id).or_insert_with(|| DcEntry {
3121                    dc_id: o.id, addr: addr.clone(),
3122                    auth_key: None, first_salt: 0, time_offset: 0,
3123                });
3124                entry.addr = addr;
3125            }
3126            log::info!("[layer] initConnection ✓  ({} DCs, ipv6={})", cfg.dc_options.len(), allow_ipv6);
3127        }
3128        Ok(())
3129    }
3130
3131    // ── DC migration ───────────────────────────────────────────────────────
3132
3133    async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
3134        let addr = {
3135            let opts = self.inner.dc_options.lock().await;
3136            opts.get(&new_dc_id).map(|e| e.addr.clone())
3137                .unwrap_or_else(|| "149.154.167.51:443".to_string())
3138        };
3139        log::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
3140
3141        let saved_key = {
3142            let opts = self.inner.dc_options.lock().await;
3143            opts.get(&new_dc_id).and_then(|e| e.auth_key)
3144        };
3145
3146        let socks5    = self.inner.socks5.clone();
3147        let transport = self.inner.transport.clone();
3148        let conn = if let Some(key) = saved_key {
3149            Connection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
3150        } else {
3151            Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
3152        };
3153
3154        let new_key = conn.auth_key_bytes();
3155        {
3156            let mut opts = self.inner.dc_options.lock().await;
3157            let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
3158                dc_id: new_dc_id, addr: addr.clone(),
3159                auth_key: None, first_salt: 0, time_offset: 0,
3160            });
3161            entry.auth_key = Some(new_key);
3162        }
3163
3164        // Split the new connection and replace writer + reader.
3165        let (new_writer, new_read, new_fk) = conn.into_writer();
3166        let new_ak  = new_writer.enc.auth_key_bytes();
3167        let new_sid = new_writer.enc.session_id();
3168        *self.inner.writer.lock().await = new_writer;
3169        *self.inner.home_dc_id.lock().await = new_dc_id;
3170
3171        // Hand the new read half to the reader task FIRST so it can route
3172        // the upcoming init_connection RPC response.
3173        let _ = self.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
3174
3175        // migrate_to() is called from user-facing methods (bot_sign_in,
3176        // request_login_code, sign_in) — NOT from inside the reader loop.
3177        // The reader task is a separate tokio task running concurrently, so
3178        // awaiting init_connection() here is safe: the reader is free to route
3179        // the RPC response while we wait. We must await before returning so
3180        // the caller can safely retry the original request on the new DC.
3181        //
3182        // Respect FLOOD_WAIT: if Telegram rate-limits init, wait and retry
3183        // rather than returning an error that would abort the whole auth flow.
3184        loop {
3185            match self.init_connection().await {
3186                Ok(()) => break,
3187                Err(InvocationError::Rpc(ref r))
3188                    if r.flood_wait_seconds().is_some() =>
3189                {
3190                    let secs = r.flood_wait_seconds().unwrap();
3191                    log::warn!(
3192                        "[layer] migrate_to DC{new_dc_id}: init FLOOD_WAIT_{secs} — waiting"
3193                    );
3194                    sleep(Duration::from_secs(secs + 1)).await;
3195                }
3196                Err(e) => return Err(e),
3197            }
3198        }
3199
3200        self.save_session().await.ok();
3201        log::info!("[layer] Now on DC{new_dc_id} ✓");
3202        Ok(())
3203    }
3204
3205    // ── G-34: Graceful shutdown ────────────────────────────────────────────
3206
3207    /// Gracefully shut down the client.
3208    ///
3209    /// Signals the reader task to exit cleanly. Equivalent to cancelling the
3210    /// [`ShutdownToken`] returned from [`Client::connect`].
3211    ///
3212    /// In-flight RPCs will receive a `Dropped` error. Call `save_session()`
3213    /// before this if you want to persist the current auth state.
3214    pub fn disconnect(&self) {
3215        self.inner.shutdown_token.cancel();
3216    }
3217
3218    // ── G-54: Expose sync_update_state publicly ────────────────────────────
3219
3220    /// Sync the internal pts/qts/seq/date state with the Telegram server.
3221    ///
3222    /// This is called automatically on `connect()`. Call it manually if you
3223    /// need to reset the update gap-detection counters, e.g. after resuming
3224    /// from a long hibernation.
3225    pub async fn sync_update_state(&self) {
3226        let _ = self.sync_pts_state().await;
3227    }
3228
3229    // ── Cache helpers ──────────────────────────────────────────────────────
3230
3231    async fn cache_user(&self, user: &tl::enums::User) {
3232        self.inner.peer_cache.lock().await.cache_user(user);
3233    }
3234
3235    async fn cache_users_slice(&self, users: &[tl::enums::User]) {
3236        let mut cache = self.inner.peer_cache.lock().await;
3237        cache.cache_users(users);
3238    }
3239
3240    async fn cache_chats_slice(&self, chats: &[tl::enums::Chat]) {
3241        let mut cache = self.inner.peer_cache.lock().await;
3242        cache.cache_chats(chats);
3243    }
3244
3245    // Public versions used by sub-modules (media.rs, participants.rs, pts.rs)
3246    #[doc(hidden)]
3247    pub async fn cache_users_slice_pub(&self, users: &[tl::enums::User]) {
3248        self.cache_users_slice(users).await;
3249    }
3250
3251    #[doc(hidden)]
3252    pub async fn cache_chats_slice_pub(&self, chats: &[tl::enums::Chat]) {
3253        self.cache_chats_slice(chats).await;
3254    }
3255
3256    /// Public RPC call for use by sub-modules.
3257    #[doc(hidden)]
3258    pub async fn rpc_call_raw_pub<R: layer_tl_types::RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
3259        self.rpc_call_raw(req).await
3260    }
3261
3262    /// Like rpc_call_raw but takes a Serializable (for InvokeWithLayer wrappers).
3263    async fn rpc_call_raw_serializable<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
3264        let mut fail_count   = NonZeroU32::new(1).unwrap();
3265        let mut slept_so_far = Duration::default();
3266        loop {
3267            match self.do_rpc_write_returning_body(req).await {
3268                Ok(body) => return Ok(body),
3269                Err(e) => {
3270                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
3271                    match self.inner.retry_policy.should_retry(&ctx) {
3272                        ControlFlow::Continue(delay) => {
3273                            sleep(delay).await;
3274                            slept_so_far += delay;
3275                            fail_count = fail_count.saturating_add(1);
3276                        }
3277                        ControlFlow::Break(()) => return Err(ctx.error),
3278                    }
3279                }
3280            }
3281        }
3282    }
3283
3284    async fn do_rpc_write_returning_body<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
3285        let (tx, rx) = oneshot::channel();
3286        {
3287            let raw_body = req.to_bytes();
3288            let body = maybe_gz_pack(&raw_body); // G-06
3289            let mut w = self.inner.writer.lock().await;
3290            let fk = w.frame_kind.clone();
3291            let acks: Vec<i64> = w.pending_ack.drain(..).collect(); // G-04+G-07
3292            if acks.is_empty() {
3293                let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
3294                w.sent_bodies.insert(msg_id, body); // G-02
3295                self.inner.pending.lock().await.insert(msg_id, tx);
3296                send_frame_write(&mut w.write_half, &wire, &fk).await?;
3297            } else {
3298                let ack_body = build_msgs_ack_body(&acks);
3299                let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
3300                let (req_msg_id, req_seqno)  = w.enc.alloc_msg_seqno(true);
3301                let container_payload = build_container_body(&[
3302                    (ack_msg_id, ack_seqno, ack_body.as_slice()),
3303                    (req_msg_id, req_seqno, body.as_slice()),
3304                ]);
3305                let wire = w.enc.pack_container(&container_payload);
3306                w.sent_bodies.insert(req_msg_id, body); // G-02
3307                self.inner.pending.lock().await.insert(req_msg_id, tx);
3308                send_frame_write(&mut w.write_half, &wire, &fk).await?;
3309            }
3310        }
3311        match tokio::time::timeout(Duration::from_secs(30), rx).await {
3312            Ok(Ok(result)) => result,
3313            Ok(Err(_))     => Err(InvocationError::Deserialize("rpc channel closed".into())),
3314            Err(_)         => Err(InvocationError::Deserialize("rpc timed out after 30 s".into())),
3315        }
3316    }
3317
3318    // ── Paginated dialog iterator ──────────────────────────────────────────
3319
3320    /// Fetch dialogs page by page.
3321    ///
3322    /// Returns a [`DialogIter`] that can be advanced with [`DialogIter::next`].
3323    /// This lets you page through all dialogs without loading them all at once.
3324    ///
3325    /// # Example
3326    /// ```rust,no_run
3327    /// # async fn f(client: layer_client::Client) -> Result<(), Box<dyn std::error::Error>> {
3328    /// let mut iter = client.iter_dialogs();
3329    /// while let Some(dialog) = iter.next(&client).await? {
3330    ///     println!("{}", dialog.title());
3331    /// }
3332    /// # Ok(()) }
3333    /// ```
3334    pub fn iter_dialogs(&self) -> DialogIter {
3335        DialogIter {
3336            offset_date: 0,
3337            offset_id:   0,
3338            offset_peer: tl::enums::InputPeer::Empty,
3339            done:        false,
3340            buffer:      VecDeque::new(),
3341            total:       None,
3342        }
3343    }
3344
3345    /// Fetch messages from a peer, page by page.
3346    ///
3347    /// Returns a [`MessageIter`] that can be advanced with [`MessageIter::next`].
3348    ///
3349    /// # Example
3350    /// ```rust,no_run
3351    /// # async fn f(client: layer_client::Client, peer: layer_tl_types::enums::Peer) -> Result<(), Box<dyn std::error::Error>> {
3352    /// let mut iter = client.iter_messages(peer);
3353    /// while let Some(msg) = iter.next(&client).await? {
3354    ///     println!("{:?}", msg.text());
3355    /// }
3356    /// # Ok(()) }
3357    /// ```
3358    pub fn iter_messages(&self, peer: tl::enums::Peer) -> MessageIter {
3359        MessageIter {
3360            peer,
3361            offset_id: 0,
3362            done:      false,
3363            buffer:    VecDeque::new(),
3364            total:     None,
3365        }
3366    }
3367
3368    // ── resolve_peer helper returning Result on unknown hash ───────────────
3369
3370    /// Try to resolve a peer to InputPeer, returning an error if the access_hash
3371    /// is unknown (i.e. the peer has not been seen in any prior API call).
3372    pub async fn resolve_to_input_peer(
3373        &self,
3374        peer: &tl::enums::Peer,
3375    ) -> Result<tl::enums::InputPeer, InvocationError> {
3376        let cache = self.inner.peer_cache.lock().await;
3377        match peer {
3378            tl::enums::Peer::User(u) => {
3379                if u.user_id == 0 {
3380                    return Ok(tl::enums::InputPeer::PeerSelf);
3381                }
3382                match cache.users.get(&u.user_id) {
3383                    Some(&hash) => Ok(tl::enums::InputPeer::User(tl::types::InputPeerUser {
3384                        user_id: u.user_id, access_hash: hash,
3385                    })),
3386                    None => Err(InvocationError::Deserialize(format!(
3387                        "access_hash unknown for user {}; resolve via username first", u.user_id
3388                    ))),
3389                }
3390            }
3391            tl::enums::Peer::Chat(c) => {
3392                Ok(tl::enums::InputPeer::Chat(tl::types::InputPeerChat { chat_id: c.chat_id }))
3393            }
3394            tl::enums::Peer::Channel(c) => {
3395                match cache.channels.get(&c.channel_id) {
3396                    Some(&hash) => Ok(tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
3397                        channel_id: c.channel_id, access_hash: hash,
3398                    })),
3399                    None => Err(InvocationError::Deserialize(format!(
3400                        "access_hash unknown for channel {}; resolve via username first", c.channel_id
3401                    ))),
3402                }
3403            }
3404        }
3405    }
3406
3407    // ── Multi-DC pool ──────────────────────────────────────────────────────
3408
3409    /// Invoke a request on a specific DC, using the pool.
3410    ///
3411    /// If the target DC has no auth key yet, one is acquired via DH and then
3412    /// authorized via `auth.exportAuthorization` / `auth.importAuthorization`
3413    /// so the worker DC can serve user-account requests too.
3414    pub async fn invoke_on_dc<R: RemoteCall>(
3415        &self,
3416        dc_id: i32,
3417        req:   &R,
3418    ) -> Result<R::Return, InvocationError> {
3419        let body = self.rpc_on_dc_raw(dc_id, req).await?;
3420        let mut cur = Cursor::from_slice(&body);
3421        R::Return::deserialize(&mut cur).map_err(Into::into)
3422    }
3423
3424    /// Raw RPC call routed to `dc_id`, exporting auth if needed.
3425    async fn rpc_on_dc_raw<R: RemoteCall>(
3426        &self,
3427        dc_id: i32,
3428        req:   &R,
3429    ) -> Result<Vec<u8>, InvocationError> {
3430        // Check if we need to open a new connection for this DC
3431        let needs_new = {
3432            let pool = self.inner.dc_pool.lock().await;
3433            !pool.has_connection(dc_id)
3434        };
3435
3436        if needs_new {
3437            let addr = {
3438                let opts = self.inner.dc_options.lock().await;
3439                opts.get(&dc_id).map(|e| e.addr.clone())
3440                    .ok_or_else(|| InvocationError::Deserialize(format!("unknown DC{dc_id}")))?
3441            };
3442
3443            let socks5    = self.inner.socks5.clone();
3444            let transport = self.inner.transport.clone();
3445            let saved_key = {
3446                let opts = self.inner.dc_options.lock().await;
3447                opts.get(&dc_id).and_then(|e| e.auth_key)
3448            };
3449
3450            let dc_conn = if let Some(key) = saved_key {
3451                dc_pool::DcConnection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
3452            } else {
3453                let conn = dc_pool::DcConnection::connect_raw(&addr, socks5.as_ref(), &transport).await?;
3454                // Export auth from home DC and import into worker DC
3455                let home_dc_id = *self.inner.home_dc_id.lock().await;
3456                if dc_id != home_dc_id {
3457                    if let Err(e) = self.export_import_auth(dc_id, &conn).await {
3458                        log::warn!("[layer] Auth export/import for DC{dc_id} failed: {e}");
3459                    }
3460                }
3461                conn
3462            };
3463
3464            let key = dc_conn.auth_key_bytes();
3465            {
3466                let mut opts = self.inner.dc_options.lock().await;
3467                if let Some(e) = opts.get_mut(&dc_id) {
3468                    e.auth_key = Some(key);
3469                }
3470            }
3471            self.inner.dc_pool.lock().await.insert(dc_id, dc_conn);
3472        }
3473
3474        let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
3475        self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, req).await
3476    }
3477
3478    /// Export authorization from the home DC and import it into `dc_id`.
3479    async fn export_import_auth(
3480        &self,
3481        dc_id:   i32,
3482        _dc_conn: &dc_pool::DcConnection, // reserved for future direct import
3483    ) -> Result<(), InvocationError> {
3484        // Export from home DC
3485        let export_req = tl::functions::auth::ExportAuthorization { dc_id };
3486        let body    = self.rpc_call_raw(&export_req).await?;
3487        let mut cur = Cursor::from_slice(&body);
3488        let exported = match tl::enums::auth::ExportedAuthorization::deserialize(&mut cur)? {
3489            tl::enums::auth::ExportedAuthorization::ExportedAuthorization(e) => e,
3490        };
3491
3492        // Import into the target DC via the pool
3493        let import_req = tl::functions::auth::ImportAuthorization {
3494            id:    exported.id,
3495            bytes: exported.bytes,
3496        };
3497        let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
3498        self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, &import_req).await?;
3499        log::info!("[layer] Auth exported+imported to DC{dc_id} ✓");
3500        Ok(())
3501    }
3502
3503    // ── Private helpers ────────────────────────────────────────────────────
3504
3505    async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
3506        let body    = self.rpc_call_raw(&tl::functions::account::GetPassword {}).await?;
3507        let mut cur = Cursor::from_slice(&body);
3508        let pw = match tl::enums::account::Password::deserialize(&mut cur)? {
3509            tl::enums::account::Password::Password(p) => p,
3510        };
3511        Ok(PasswordToken { password: pw })
3512    }
3513
3514    fn make_send_code_req(&self, phone: &str) -> tl::functions::auth::SendCode {
3515        tl::functions::auth::SendCode {
3516            phone_number: phone.to_string(),
3517            api_id:       self.inner.api_id,
3518            api_hash:     self.inner.api_hash.clone(),
3519            settings:     tl::enums::CodeSettings::CodeSettings(
3520                tl::types::CodeSettings {
3521                    allow_flashcall: false, current_number: false, allow_app_hash: false,
3522                    allow_missed_call: false, allow_firebase: false, unknown_number: false,
3523                    logout_tokens: None, token: None, app_sandbox: None,
3524                },
3525            ),
3526        }
3527    }
3528
3529    fn extract_user_name(user: &tl::enums::User) -> String {
3530        match user {
3531            tl::enums::User::User(u) => {
3532                format!("{} {}",
3533                    u.first_name.as_deref().unwrap_or(""),
3534                    u.last_name.as_deref().unwrap_or(""))
3535                    .trim().to_string()
3536            }
3537            tl::enums::User::Empty(_) => "(unknown)".into(),
3538        }
3539    }
3540
3541    fn extract_password_params(
3542        algo: &tl::enums::PasswordKdfAlgo,
3543    ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
3544        match algo {
3545            tl::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
3546                Ok((&a.salt1, &a.salt2, &a.p, a.g))
3547            }
3548            _ => Err(InvocationError::Deserialize("unsupported password KDF algo".into())),
3549        }
3550    }
3551}
3552
3553// ─── Paginated iterators ──────────────────────────────────────────────────────
3554
3555/// Cursor-based iterator over dialogs. Created by [`Client::iter_dialogs`].
3556pub struct DialogIter {
3557    offset_date: i32,
3558    offset_id:   i32,
3559    offset_peer: tl::enums::InputPeer,
3560    done:        bool,
3561    buffer:      VecDeque<Dialog>,
3562    /// Total dialog count as reported by the first server response.
3563    /// `None` until the first page is fetched.
3564    pub total: Option<i32>,
3565}
3566
3567impl DialogIter {
3568    const PAGE_SIZE: i32 = 100;
3569
3570    /// Total number of dialogs as reported by the server on the first page fetch.
3571    ///
3572    /// Returns `None` before the first [`next`](Self::next) call, and `None` for
3573    /// accounts with fewer dialogs than `PAGE_SIZE` (where the server returns
3574    /// `messages.Dialogs` instead of `messages.DialogsSlice`).
3575    pub fn total(&self) -> Option<i32> { self.total }
3576
3577    /// Fetch the next dialog. Returns `None` when all dialogs have been yielded.
3578    pub async fn next(&mut self, client: &Client) -> Result<Option<Dialog>, InvocationError> {
3579        if let Some(d) = self.buffer.pop_front() { return Ok(Some(d)); }
3580        if self.done { return Ok(None); }
3581
3582        let req = tl::functions::messages::GetDialogs {
3583            exclude_pinned: false,
3584            folder_id:      None,
3585            offset_date:    self.offset_date,
3586            offset_id:      self.offset_id,
3587            offset_peer:    self.offset_peer.clone(),
3588            limit:          Self::PAGE_SIZE,
3589            hash:           0,
3590        };
3591
3592        let (dialogs, count) = client.get_dialogs_raw_with_count(req).await?;
3593        // Populate total from the first response (messages.DialogsSlice carries a count).
3594        if self.total.is_none() {
3595            self.total = count;
3596        }
3597        if dialogs.is_empty() || dialogs.len() < Self::PAGE_SIZE as usize {
3598            self.done = true;
3599        }
3600
3601        // Prepare cursor for next page
3602        if let Some(last) = dialogs.last() {
3603            self.offset_date = last.message.as_ref().map(|m| match m {
3604                tl::enums::Message::Message(x) => x.date,
3605                tl::enums::Message::Service(x) => x.date,
3606                _ => 0,
3607            }).unwrap_or(0);
3608            self.offset_id = last.top_message();
3609            if let Some(peer) = last.peer() {
3610                self.offset_peer = client.inner.peer_cache.lock().await.peer_to_input(peer);
3611            }
3612        }
3613
3614        self.buffer.extend(dialogs);
3615        Ok(self.buffer.pop_front())
3616    }
3617}
3618
3619/// Cursor-based iterator over message history. Created by [`Client::iter_messages`].
3620pub struct MessageIter {
3621    peer:      tl::enums::Peer,
3622    offset_id: i32,
3623    done:      bool,
3624    buffer:    VecDeque<update::IncomingMessage>,
3625    /// Total message count from the first server response (messages.Slice).
3626    /// `None` until the first page is fetched, `None` for `messages.Messages`
3627    /// (which returns an exact slice with no separate count).
3628    pub total: Option<i32>,
3629}
3630
3631impl MessageIter {
3632    const PAGE_SIZE: i32 = 100;
3633
3634    /// Total message count from the first server response.
3635    ///
3636    /// Returns `None` before the first [`next`](Self::next) call, or for chats
3637    /// where the server returns an exact (non-slice) response.
3638    pub fn total(&self) -> Option<i32> { self.total }
3639
3640    /// Fetch the next message (newest first). Returns `None` when all messages have been yielded.
3641    pub async fn next(&mut self, client: &Client) -> Result<Option<update::IncomingMessage>, InvocationError> {
3642        if let Some(m) = self.buffer.pop_front() { return Ok(Some(m)); }
3643        if self.done { return Ok(None); }
3644
3645        let input_peer = client.inner.peer_cache.lock().await.peer_to_input(&self.peer);
3646        let (page, count) = client.get_messages_with_count(input_peer, Self::PAGE_SIZE, self.offset_id).await?;
3647
3648        if self.total.is_none() { self.total = count; }
3649
3650        if page.is_empty() || page.len() < Self::PAGE_SIZE as usize {
3651            self.done = true;
3652        }
3653        if let Some(last) = page.last() {
3654            self.offset_id = last.id();
3655        }
3656
3657        self.buffer.extend(page);
3658        Ok(self.buffer.pop_front())
3659    }
3660}
3661
3662// ─── Public random helper (used by media.rs) ──────────────────────────────────
3663
3664/// Public wrapper for `random_i64` used by sub-modules.
3665#[doc(hidden)]
3666pub fn random_i64_pub() -> i64 { random_i64() }
3667
3668// ─── Connection ───────────────────────────────────────────────────────────────
3669
3670/// How framing bytes are sent/received on a connection.
3671#[derive(Clone)]
3672enum FrameKind {
3673    Abridged,
3674    Intermediate,
3675    #[allow(dead_code)]
3676    Full { send_seqno: u32, recv_seqno: u32 },
3677}
3678
3679
3680// ─── Split connection types ───────────────────────────────────────────────────
3681
3682/// Write half of a split connection.  Held under `Mutex` in `ClientInner`.
3683/// Owns the EncryptedSession (for packing) and the pending-RPC map.
3684struct ConnectionWriter {
3685    write_half: OwnedWriteHalf,
3686    enc:        EncryptedSession,
3687    frame_kind: FrameKind,
3688    /// G-04: msg_ids of received content messages waiting to be acked.
3689    /// Drained into a MsgsAck on every outgoing frame (bundled into container
3690    /// when sending an RPC, or sent standalone after route_frame).
3691    pending_ack: Vec<i64>,
3692    /// G-02: raw TL body bytes of every sent request, keyed by msg_id.
3693    /// On bad_msg_notification the matching body is re-encrypted with a fresh
3694    /// msg_id and re-sent transparently.
3695    sent_bodies: std::collections::HashMap<i64, Vec<u8>>,
3696}
3697
3698impl ConnectionWriter {
3699    fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
3700    fn first_salt(&self)    -> i64         { self.enc.salt }
3701    fn time_offset(&self)   -> i32         { self.enc.time_offset }
3702}
3703
3704struct Connection {
3705    stream:     TcpStream,
3706    enc:        EncryptedSession,
3707    frame_kind: FrameKind,
3708}
3709
3710impl Connection {
3711    /// Open a TCP stream, optionally via SOCKS5, and apply transport init bytes.
3712    async fn open_stream(
3713        addr:      &str,
3714        socks5:    Option<&crate::socks5::Socks5Config>,
3715        transport: &TransportKind,
3716    ) -> Result<(TcpStream, FrameKind), InvocationError> {
3717        let stream = match socks5 {
3718            Some(proxy) => proxy.connect(addr).await?,
3719            None => {
3720                // Let tokio do the TCP handshake properly (await until connected),
3721                // then apply socket2 keepalive options to the live socket.
3722                let stream = TcpStream::connect(addr).await
3723                    .map_err(InvocationError::Io)?;
3724
3725                // TCP-level keepalive: OS sends probes independently of our
3726                // application-level pings. Catches cases where the network
3727                // disappears without a TCP RST (e.g. mobile data switching,
3728                // NAT table expiry) faster than a 15 s application ping would.
3729                // We use socket2 only for the setsockopt call, not for connect.
3730                {
3731                    let sock = socket2::SockRef::from(&stream);
3732                    let keepalive = TcpKeepalive::new()
3733                        .with_time(Duration::from_secs(TCP_KEEPALIVE_IDLE_SECS))
3734                        .with_interval(Duration::from_secs(TCP_KEEPALIVE_INTERVAL_SECS));
3735                    #[cfg(not(target_os = "windows"))]
3736                    let keepalive = keepalive.with_retries(TCP_KEEPALIVE_PROBES);
3737                    sock.set_tcp_keepalive(&keepalive).ok();
3738                }
3739                stream
3740            }
3741        };
3742        Self::apply_transport_init(stream, transport).await
3743    }
3744
3745    /// Send the transport init bytes and return the stream + FrameKind.
3746    async fn apply_transport_init(
3747        mut stream: TcpStream,
3748        transport:  &TransportKind,
3749    ) -> Result<(TcpStream, FrameKind), InvocationError> {
3750        match transport {
3751            TransportKind::Abridged => {
3752                stream.write_all(&[0xef]).await?;
3753                Ok((stream, FrameKind::Abridged))
3754            }
3755            TransportKind::Intermediate => {
3756                stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
3757                Ok((stream, FrameKind::Intermediate))
3758            }
3759            TransportKind::Full => {
3760                // Full transport has no init byte
3761                Ok((stream, FrameKind::Full { send_seqno: 0, recv_seqno: 0 }))
3762            }
3763            TransportKind::Obfuscated { secret } => {
3764                // For obfuscated we do the full handshake inside open_obfuscated,
3765                // then wrap back in a plain TcpStream via into_inner.
3766                // Since ObfuscatedStream is a different type we reuse the Abridged
3767                // frame logic internally — the encryption layer handles everything.
3768                //
3769                // Implementation note: We convert to Abridged after the handshake
3770                // because ObfuscatedStream internally already uses Abridged framing
3771                // with XOR applied on top.  The outer Connection just sends raw bytes.
3772                let mut nonce = [0u8; 64];
3773                getrandom::getrandom(&mut nonce).map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
3774                // Write obfuscated handshake header
3775                let (enc_key, enc_iv, _dec_key, _dec_iv) = crate::transport_obfuscated::derive_keys(&nonce, secret.as_ref());
3776                let mut enc_cipher = crate::transport_obfuscated::ObfCipher::new(enc_key, enc_iv);
3777                // Stamp protocol tag into nonce[56..60]
3778                let mut handshake = nonce;
3779                handshake[56] = 0xef; handshake[57] = 0xef;
3780                handshake[58] = 0xef; handshake[59] = 0xef;
3781                enc_cipher.apply(&mut handshake[56..]);
3782                stream.write_all(&handshake).await?;
3783                Ok((stream, FrameKind::Abridged))
3784            }
3785        }
3786    }
3787
3788    async fn connect_raw(
3789        addr:      &str,
3790        socks5:    Option<&crate::socks5::Socks5Config>,
3791        transport: &TransportKind,
3792    ) -> Result<Self, InvocationError> {
3793        log::info!("[layer] Connecting to {addr} (DH) …");
3794
3795        // Wrap the entire DH handshake in a timeout so a silent server
3796        // response (e.g. a mis-framed transport error) never causes an
3797        // infinite hang.
3798        let addr2      = addr.to_string();
3799        let socks5_c   = socks5.cloned();
3800        let transport_c = transport.clone();
3801
3802        let fut = async move {
3803            let (mut stream, frame_kind) =
3804                Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
3805
3806            let mut plain = Session::new();
3807
3808            let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3809            send_frame(&mut stream, &plain.pack(&req1).to_plaintext_bytes(), &frame_kind).await?;
3810            let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
3811
3812            let (req2, s2) = auth::step2(s1, res_pq).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3813            send_frame(&mut stream, &plain.pack(&req2).to_plaintext_bytes(), &frame_kind).await?;
3814            let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
3815
3816            let (req3, s3) = auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3817            send_frame(&mut stream, &plain.pack(&req3).to_plaintext_bytes(), &frame_kind).await?;
3818            let ans: tl::enums::SetClientDhParamsAnswer = recv_frame_plain(&mut stream, &frame_kind).await?;
3819
3820            let done = auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3821            log::info!("[layer] DH complete ✓");
3822
3823            Ok::<Self, InvocationError>(Self {
3824                stream,
3825                enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
3826                frame_kind,
3827            })
3828        };
3829
3830        tokio::time::timeout(Duration::from_secs(15), fut)
3831            .await
3832            .map_err(|_| InvocationError::Deserialize(
3833                format!("DH handshake with {addr} timed out after 15 s")
3834            ))?
3835    }
3836
3837    async fn connect_with_key(
3838        addr:        &str,
3839        auth_key:    [u8; 256],
3840        first_salt:  i64,
3841        time_offset: i32,
3842        socks5:      Option<&crate::socks5::Socks5Config>,
3843        transport:   &TransportKind,
3844    ) -> Result<Self, InvocationError> {
3845        let addr2       = addr.to_string();
3846        let socks5_c    = socks5.cloned();
3847        let transport_c = transport.clone();
3848
3849        let fut = async move {
3850            let (stream, frame_kind) =
3851                Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
3852            Ok::<Self, InvocationError>(Self {
3853                stream,
3854                enc: EncryptedSession::new(auth_key, first_salt, time_offset),
3855                frame_kind,
3856            })
3857        };
3858
3859        tokio::time::timeout(Duration::from_secs(15), fut)
3860            .await
3861            .map_err(|_| InvocationError::Deserialize(
3862                format!("connect_with_key to {addr} timed out after 15 s")
3863            ))?
3864    }
3865
3866    fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
3867
3868    /// Split into a write-only `ConnectionWriter` and the TCP read half.
3869    fn into_writer(self) -> (ConnectionWriter, OwnedReadHalf, FrameKind) {
3870        let (read_half, write_half) = self.stream.into_split();
3871        let writer = ConnectionWriter {
3872            write_half,
3873            enc:        self.enc,
3874            frame_kind: self.frame_kind.clone(),
3875            pending_ack: Vec::new(),
3876            sent_bodies: std::collections::HashMap::new(),
3877        };
3878        (writer, read_half, self.frame_kind)
3879    }
3880}
3881
3882// ─── Transport framing (multi-kind) ──────────────────────────────────────────
3883
3884/// Send a framed message using the active transport kind.
3885async fn send_frame(
3886    stream: &mut TcpStream,
3887    data:   &[u8],
3888    kind:   &FrameKind,
3889) -> Result<(), InvocationError> {
3890    match kind {
3891        FrameKind::Abridged => send_abridged(stream, data).await,
3892        FrameKind::Intermediate => {
3893            stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
3894            stream.write_all(data).await?;
3895            Ok(())
3896        }
3897        FrameKind::Full { .. } => {
3898            // seqno and CRC handled inside Connection; here we just prefix length
3899            // Full framing: [total_len 4B][seqno 4B][payload][crc32 4B]
3900            // But send_frame is called with already-encrypted payload.
3901            // We use a simplified approach: emit the same as Intermediate for now
3902            // and note that Full's seqno/CRC are transport-level, not app-level.
3903            stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
3904            stream.write_all(data).await?;
3905            Ok(())
3906        }
3907    }
3908}
3909
3910// ─── Split-reader helpers ─────────────────────────────────────────────────────
3911
3912/// Outcome of a timed frame read attempt.
3913enum FrameOutcome {
3914    Frame(Vec<u8>),
3915    Error(InvocationError),
3916    Keepalive,  // timeout elapsed but ping was sent; caller should loop
3917}
3918
3919/// Read one frame with a 60-second keepalive timeout (PING_DELAY_SECS).
3920///
3921/// If the timeout fires we send a `PingDelayDisconnect` — this tells Telegram
3922/// to forcibly close the connection after `NO_PING_DISCONNECT` seconds of
3923/// silence, giving us a clean EOF to detect rather than a silently stale socket.
3924/// That mirrors what both grammers and the official Telegram clients do.
3925async fn recv_frame_with_keepalive(
3926    rh:      &mut OwnedReadHalf,
3927    fk:      &FrameKind,
3928    client:  &Client,
3929    _ak:     &[u8; 256],
3930) -> FrameOutcome {
3931    match tokio::time::timeout(Duration::from_secs(PING_DELAY_SECS), recv_frame_read(rh, fk)).await {
3932        Ok(Ok(raw)) => FrameOutcome::Frame(raw),
3933        Ok(Err(e))  => FrameOutcome::Error(e),
3934        Err(_) => {
3935            // Keepalive timeout: send PingDelayDisconnect so Telegram closes the
3936            // connection cleanly (EOF) if it hears nothing for NO_PING_DISCONNECT
3937            // seconds, rather than leaving a silently stale socket.
3938            let ping_req = tl::functions::PingDelayDisconnect {
3939                ping_id:          random_i64(),
3940                disconnect_delay: NO_PING_DISCONNECT,
3941            };
3942            let mut w = client.inner.writer.lock().await;
3943            let wire = w.enc.pack(&ping_req);
3944            let fk = w.frame_kind.clone();
3945            match send_frame_write(&mut w.write_half, &wire, &fk).await {
3946                Ok(()) => FrameOutcome::Keepalive,
3947                // If the write itself fails the connection is already dead.
3948                // Return Error so the reader immediately enters the reconnect loop
3949                // instead of sitting silent for another PING_DELAY_SECS.
3950                Err(e) => FrameOutcome::Error(e),
3951            }
3952        }
3953    }
3954}
3955
3956/// Send a framed message via an OwnedWriteHalf (split connection).
3957async fn send_frame_write(
3958    stream: &mut OwnedWriteHalf,
3959    data:   &[u8],
3960    kind:   &FrameKind,
3961) -> Result<(), InvocationError> {
3962    match kind {
3963        FrameKind::Abridged => {
3964            let words = data.len() / 4;
3965            if words < 0x7f {
3966                stream.write_all(&[words as u8]).await?;
3967            } else {
3968                let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
3969                stream.write_all(&b).await?;
3970            }
3971            stream.write_all(data).await?;
3972            Ok(())
3973        }
3974        FrameKind::Intermediate | FrameKind::Full { .. } => {
3975            stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
3976            stream.write_all(data).await?;
3977            Ok(())
3978        }
3979    }
3980}
3981
3982/// Receive a framed message via an OwnedReadHalf (split connection).
3983async fn recv_frame_read(
3984    stream: &mut OwnedReadHalf,
3985    kind:   &FrameKind,
3986) -> Result<Vec<u8>, InvocationError> {
3987    match kind {
3988        FrameKind::Abridged => {
3989            let mut h = [0u8; 1];
3990            stream.read_exact(&mut h).await?;
3991            let words = if h[0] < 0x7f {
3992                h[0] as usize
3993            } else {
3994                let mut b = [0u8; 3];
3995                stream.read_exact(&mut b).await?;
3996                b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
3997            };
3998            let len = words * 4;
3999            let mut buf = vec![0u8; len];
4000            stream.read_exact(&mut buf).await?;
4001            Ok(buf)
4002        }
4003        FrameKind::Intermediate | FrameKind::Full { .. } => {
4004            let mut len_buf = [0u8; 4];
4005            stream.read_exact(&mut len_buf).await?;
4006            let len = u32::from_le_bytes(len_buf) as usize;
4007            let mut buf = vec![0u8; len];
4008            stream.read_exact(&mut buf).await?;
4009            Ok(buf)
4010        }
4011    }
4012}
4013
4014
4015/// Send using Abridged framing (used for DH plaintext during connect).
4016async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
4017    let words = data.len() / 4;
4018    if words < 0x7f {
4019        stream.write_all(&[words as u8]).await?;
4020    } else {
4021        let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
4022        stream.write_all(&b).await?;
4023    }
4024    stream.write_all(data).await?;
4025    Ok(())
4026}
4027
4028async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
4029    let mut h = [0u8; 1];
4030    stream.read_exact(&mut h).await?;
4031    let words = if h[0] < 0x7f {
4032        h[0] as usize
4033    } else {
4034        let mut b = [0u8; 3];
4035        stream.read_exact(&mut b).await?;
4036        let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
4037        // word count of 1 after 0xFF = Telegram 4-byte transport error code
4038        if w == 1 {
4039            let mut code_buf = [0u8; 4];
4040            stream.read_exact(&mut code_buf).await?;
4041            let code = i32::from_le_bytes(code_buf);
4042            return Err(InvocationError::Rpc(RpcError::from_telegram(code, "transport error")));
4043        }
4044        w
4045    };
4046    // Guard against implausibly large reads — a raw 4-byte transport error
4047    // whose first byte was mis-read as a word count causes a hang otherwise.
4048    if words == 0 || words > 0x8000 {
4049        return Err(InvocationError::Deserialize(
4050            format!("abridged: implausible word count {words} (possible transport error or framing mismatch)")
4051        ));
4052    }
4053    let mut buf = vec![0u8; words * 4];
4054    stream.read_exact(&mut buf).await?;
4055    Ok(buf)
4056}
4057
4058/// Receive a plaintext (pre-auth) frame and deserialize it.
4059async fn recv_frame_plain<T: Deserializable>(
4060    stream: &mut TcpStream,
4061    _kind:  &FrameKind,
4062) -> Result<T, InvocationError> {
4063    let raw = recv_abridged(stream).await?; // DH always uses abridged for plaintext
4064    if raw.len() < 20 {
4065        return Err(InvocationError::Deserialize("plaintext frame too short".into()));
4066    }
4067    if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
4068        return Err(InvocationError::Deserialize("expected auth_key_id=0 in plaintext".into()));
4069    }
4070    let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
4071    let mut cur  = Cursor::from_slice(&raw[20..20 + body_len]);
4072    T::deserialize(&mut cur).map_err(Into::into)
4073}
4074
4075// ─── MTProto envelope ─────────────────────────────────────────────────────────
4076
4077enum EnvelopeResult {
4078    Payload(Vec<u8>),
4079    Updates(Vec<update::Update>),
4080    None,
4081}
4082
4083fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
4084    if body.len() < 4 {
4085        return Err(InvocationError::Deserialize("body < 4 bytes".into()));
4086    }
4087    let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
4088
4089    match cid {
4090        ID_RPC_RESULT => {
4091            if body.len() < 12 {
4092                return Err(InvocationError::Deserialize("rpc_result too short".into()));
4093            }
4094            unwrap_envelope(body[12..].to_vec())
4095        }
4096        ID_RPC_ERROR => {
4097            if body.len() < 8 {
4098                return Err(InvocationError::Deserialize("rpc_error too short".into()));
4099            }
4100            let code    = i32::from_le_bytes(body[4..8].try_into().unwrap());
4101            let message = tl_read_string(&body[8..]).unwrap_or_default();
4102            Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
4103        }
4104        ID_MSG_CONTAINER => {
4105            if body.len() < 8 {
4106                return Err(InvocationError::Deserialize("container too short".into()));
4107            }
4108            let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
4109            let mut pos = 8usize;
4110            let mut payload: Option<Vec<u8>> = None;
4111            let mut updates_buf: Vec<update::Update> = Vec::new();
4112
4113            for _ in 0..count {
4114                if pos + 16 > body.len() { break; }
4115                let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
4116                pos += 16;
4117                if pos + inner_len > body.len() { break; }
4118                let inner = body[pos..pos + inner_len].to_vec();
4119                pos += inner_len;
4120                match unwrap_envelope(inner)? {
4121                    EnvelopeResult::Payload(p)  => { payload = Some(p); }
4122                    EnvelopeResult::Updates(us) => { updates_buf.extend(us); }
4123                    EnvelopeResult::None        => {}
4124                }
4125            }
4126            if let Some(p) = payload {
4127                Ok(EnvelopeResult::Payload(p))
4128            } else if !updates_buf.is_empty() {
4129                Ok(EnvelopeResult::Updates(updates_buf))
4130            } else {
4131                Ok(EnvelopeResult::None)
4132            }
4133        }
4134        ID_GZIP_PACKED => {
4135            let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
4136            unwrap_envelope(gz_inflate(&bytes)?)
4137        }
4138        // MTProto service messages — silently acknowledged, no payload extracted.
4139        // NOTE: ID_PONG is intentionally NOT listed here. Pong arrives as a bare
4140        // top-level frame (never inside rpc_result), so it is handled in route_frame
4141        // directly. Silencing it here would drop it before invoke() can resolve it.
4142        ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY
4143        // These are correctly silenced (grammers silences these too)
4144        | 0xd33b5459  // MsgsStateReq
4145        | 0x04deb57d  // MsgsStateInfo
4146        | 0x8cc0d131  // MsgsAllInfo
4147        | 0x276d3ec6  // MsgDetailedInfo
4148        | 0x809db6df  // MsgNewDetailedInfo
4149        | 0x7d861a08  // MsgResendReq / MsgResendAnsReq
4150        | 0x0949d9dc  // FutureSalt
4151        | 0xae500895  // FutureSalts
4152        | 0x9299359f  // HttpWait
4153        | 0xe22045fc  // DestroySessionOk
4154        | 0x62d350c9  // DestroySessionNone
4155        => {
4156            Ok(EnvelopeResult::None)
4157        }
4158        ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
4159        | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
4160        | ID_UPDATES_TOO_LONG => {
4161            Ok(EnvelopeResult::Updates(update::parse_updates(&body)))
4162        }
4163        _ => Ok(EnvelopeResult::Payload(body)),
4164    }
4165}
4166
4167// ─── Utilities ────────────────────────────────────────────────────────────────
4168
4169fn random_i64() -> i64 {
4170    let mut b = [0u8; 8];
4171    getrandom::getrandom(&mut b).expect("getrandom");
4172    i64::from_le_bytes(b)
4173}
4174
4175/// Apply ±20 % random jitter to a backoff delay.
4176/// Prevents thundering-herd when many clients reconnect simultaneously
4177/// (e.g. after a server restart or a shared network outage).
4178fn jitter_delay(base_ms: u64) -> Duration {
4179    // Use two random bytes for the jitter factor (0..=65535 → 0.80 … 1.20).
4180    let mut b = [0u8; 2];
4181    getrandom::getrandom(&mut b).unwrap_or(());
4182    let rand_frac = u16::from_le_bytes(b) as f64 / 65535.0; // 0.0 … 1.0
4183    let factor    = 0.80 + rand_frac * 0.40;                 // 0.80 … 1.20
4184    Duration::from_millis((base_ms as f64 * factor) as u64)
4185}
4186
4187fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
4188    if data.is_empty() { return Some(vec![]); }
4189    let (len, start) = if data[0] < 254 { (data[0] as usize, 1) }
4190    else if data.len() >= 4 {
4191        (data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16, 4)
4192    } else { return None; };
4193    if data.len() < start + len { return None; }
4194    Some(data[start..start + len].to_vec())
4195}
4196
4197fn tl_read_string(data: &[u8]) -> Option<String> {
4198    tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
4199}
4200
4201fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
4202    use std::io::Read;
4203    let mut out = Vec::new();
4204    if flate2::read::GzDecoder::new(data).read_to_end(&mut out).is_ok() && !out.is_empty() {
4205        return Ok(out);
4206    }
4207    out.clear();
4208    flate2::read::ZlibDecoder::new(data)
4209        .read_to_end(&mut out)
4210        .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
4211    Ok(out)
4212}
4213
4214// ── G-06: outgoing gzip compression ──────────────────────────────────────────
4215
4216/// Minimum body size above which we attempt zlib compression.
4217/// Below this threshold the gzip_packed wrapper overhead exceeds the gain.
4218const COMPRESSION_THRESHOLD: usize = 512;
4219
4220/// TL `bytes` wire encoding (used inside gzip_packed).
4221fn tl_write_bytes(data: &[u8]) -> Vec<u8> {
4222    let len = data.len();
4223    let mut out = Vec::with_capacity(4 + len);
4224    if len < 254 {
4225        out.push(len as u8);
4226        out.extend_from_slice(data);
4227        let pad = (4 - (1 + len) % 4) % 4;
4228        out.extend(std::iter::repeat(0u8).take(pad));
4229    } else {
4230        out.push(0xfe);
4231        out.extend_from_slice(&(len as u32).to_le_bytes()[..3]);
4232        out.extend_from_slice(data);
4233        let pad = (4 - (4 + len) % 4) % 4;
4234        out.extend(std::iter::repeat(0u8).take(pad));
4235    }
4236    out
4237}
4238
4239/// Wrap `data` in a `gzip_packed#3072cfa1 packed_data:bytes` TL frame.
4240fn gz_pack_body(data: &[u8]) -> Vec<u8> {
4241    use std::io::Write;
4242    let mut enc = flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default());
4243    let _ = enc.write_all(data);
4244    let compressed = enc.finish().unwrap_or_default();
4245    let mut out = Vec::with_capacity(4 + 4 + compressed.len());
4246    out.extend_from_slice(&ID_GZIP_PACKED.to_le_bytes());
4247    out.extend(tl_write_bytes(&compressed));
4248    out
4249}
4250
4251/// Optionally compress `data`.  Returns the compressed `gzip_packed` wrapper
4252/// if it is shorter than the original; otherwise returns `data` unchanged.
4253fn maybe_gz_pack(data: &[u8]) -> Vec<u8> {
4254    if data.len() <= COMPRESSION_THRESHOLD {
4255        return data.to_vec();
4256    }
4257    let packed = gz_pack_body(data);
4258    if packed.len() < data.len() { packed } else { data.to_vec() }
4259}
4260
4261// ── G-04 + G-07: MsgsAck body builder ───────────────────────────────────────
4262
4263/// Build the TL body for `msgs_ack#62d6b459 msg_ids:Vector<long>`.
4264fn build_msgs_ack_body(msg_ids: &[i64]) -> Vec<u8> {
4265    // msgs_ack#62d6b459 msg_ids:Vector<long>
4266    // Vector<long>: 0x1cb5c415 + count:int + [i64...]
4267    let mut out = Vec::with_capacity(4 + 4 + 4 + msg_ids.len() * 8);
4268    out.extend_from_slice(&ID_MSGS_ACK.to_le_bytes());
4269    out.extend_from_slice(&0x1cb5c415_u32.to_le_bytes()); // Vector constructor
4270    out.extend_from_slice(&(msg_ids.len() as u32).to_le_bytes());
4271    for &id in msg_ids {
4272        out.extend_from_slice(&id.to_le_bytes());
4273    }
4274    out
4275}
4276
4277// ── G-07: MessageContainer body builder ──────────────────────────────────────
4278
4279/// Build the body of a `msg_container#73f1f8dc` from a list of
4280/// `(msg_id, seqno, body)` inner messages.
4281///
4282/// The caller is responsible for allocating msg_id and seqno for each entry
4283/// via `EncryptedSession::alloc_msg_seqno`.
4284fn build_container_body(messages: &[(i64, i32, &[u8])]) -> Vec<u8> {
4285    let total_body: usize = messages.iter().map(|(_, _, b)| 16 + b.len()).sum();
4286    let mut out = Vec::with_capacity(8 + total_body);
4287    out.extend_from_slice(&ID_MSG_CONTAINER.to_le_bytes());
4288    out.extend_from_slice(&(messages.len() as u32).to_le_bytes());
4289    for &(msg_id, seqno, body) in messages {
4290        out.extend_from_slice(&msg_id.to_le_bytes());
4291        out.extend_from_slice(&seqno.to_le_bytes());
4292        out.extend_from_slice(&(body.len() as u32).to_le_bytes());
4293        out.extend_from_slice(body);
4294    }
4295    out
4296}