Skip to main content

layer_client/
lib.rs

1//! # layer-client
2//!
3//! Production-grade async Telegram client built on MTProto.
4//!
5//! ## Features
6//! - User login (phone + code + 2FA SRP) and bot token login
7//! - Peer access-hash caching — API calls always carry correct access hashes
8//! - `FLOOD_WAIT` auto-retry with configurable policy
9//! - Typed async update stream: `NewMessage`, `MessageEdited`, `MessageDeleted`,
10//!   `CallbackQuery`, `InlineQuery`, `InlineSend`, `Raw`
11//! - Send / edit / delete / forward / pin messages
12//! - Search messages (per-chat and global)
13//! - Mark as read, delete dialogs, clear mentions
14//! - Join chat / accept invite links
15//! - Chat action (typing, uploading, …)
16//! - `get_me()` — fetch own User info
17//! - Paginated dialog and message iterators
18//! - DC migration, session persistence, reconnect
19
20#![deny(unsafe_code)]
21
22mod errors;
23mod retry;
24mod session;
25mod transport;
26mod two_factor_auth;
27pub mod update;
28pub mod parsers;
29pub mod media;
30pub mod participants;
31pub mod pts;
32
33// ── New feature modules ───────────────────────────────────────────────────────
34pub mod dc_pool;
35pub mod transport_obfuscated;
36pub mod transport_intermediate;
37pub mod socks5;
38pub mod session_backend;
39pub mod inline_iter;
40pub mod typing_guard;
41pub mod keyboard;
42
43#[macro_use]
44pub mod macros;
45
46pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
47pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
48pub use update::Update;
49pub use media::{UploadedFile, DownloadIter};
50pub use participants::Participant;
51pub use typing_guard::TypingGuard;
52pub use socks5::Socks5Config;
53pub use session_backend::{SessionBackend, BinaryFileBackend, InMemoryBackend};
54pub use keyboard::{Button, InlineKeyboard, ReplyKeyboard};
55
56/// Re-export of `layer_tl_types` — generated TL constructors, functions, and enums.
57/// Users can write `use layer_client::tl` instead of adding a separate `layer-tl-types` dep.
58pub use layer_tl_types as tl;
59
60use std::collections::HashMap;
61use std::collections::VecDeque;
62use std::num::NonZeroU32;
63use std::ops::ControlFlow;
64use std::sync::Arc;
65use std::time::Duration;
66
67use layer_mtproto::{EncryptedSession, Session, authentication as auth};
68use layer_tl_types::{Cursor, Deserializable, RemoteCall};
69use session::{DcEntry, PersistedSession};
70use tokio::io::{AsyncReadExt, AsyncWriteExt};
71use tokio::net::TcpStream;
72use tokio::sync::{mpsc, oneshot, Mutex};
73use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
74use tokio::time::sleep;
75use tokio_util::sync::CancellationToken;
76use socket2::TcpKeepalive;
77
78// ─── MTProto envelope constructor IDs ────────────────────────────────────────
79
80const ID_RPC_RESULT:       u32 = 0xf35c6d01;
81const ID_RPC_ERROR:        u32 = 0x2144ca19;
82const ID_MSG_CONTAINER:    u32 = 0x73f1f8dc;
83const ID_GZIP_PACKED:      u32 = 0x3072cfa1;
84const ID_PONG:             u32 = 0x347773c5;
85const ID_MSGS_ACK:         u32 = 0x62d6b459;
86const ID_BAD_SERVER_SALT:  u32 = 0xedab447b;
87const ID_NEW_SESSION:      u32 = 0x9ec20908;
88const ID_BAD_MSG_NOTIFY:   u32 = 0xa7eff811;
89const ID_UPDATES:          u32 = 0x74ae4240;
90const ID_UPDATE_SHORT:     u32 = 0x78d4dec1;
91const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
92const ID_UPDATE_SHORT_MSG:      u32 = 0x313bc7f8;
93const ID_UPDATE_SHORT_CHAT_MSG: u32 = 0x4d6deea5;
94const ID_UPDATES_TOO_LONG:      u32 = 0xe317af7e;
95
96// ─── Keepalive / reconnect tuning ─────────────────────────────────────────────
97
98/// How often to send a keepalive ping.
99/// Telegram Desktop uses ~15 s on mobile connections.  60 s is too slow to
100/// detect a dead connection before the user notices.
101const PING_DELAY_SECS: u64 = 15;
102
103/// Tell Telegram to close the connection if it hears nothing for this many
104/// seconds.  Must be > PING_DELAY_SECS so a single missed ping doesn't drop us.
105const NO_PING_DISCONNECT: i32 = 20;
106
107/// Initial backoff before the first reconnect attempt.
108const RECONNECT_BASE_MS: u64 = 500;
109
110/// Maximum backoff between reconnect attempts.
111/// 5 s cap instead of 30 s — on mobile, network outages are brief and a 30 s
112/// sleep means the bot stays dead for up to 30 s after the network returns.
113/// Official Telegram mobile clients use a short cap for the same reason.
114const RECONNECT_MAX_SECS: u64 = 5;
115
116/// TCP socket-level keepalive: start probes after this many seconds of idle.
117const TCP_KEEPALIVE_IDLE_SECS: u64 = 10;
118/// Interval between TCP keepalive probes.
119const TCP_KEEPALIVE_INTERVAL_SECS: u64 = 5;
120/// Number of failed probes before the OS declares the connection dead.
121const TCP_KEEPALIVE_PROBES: u32 = 3;
122
123// ─── PeerCache ────────────────────────────────────────────────────────────────
124
125/// Caches access hashes for users and channels so every API call carries the
126/// correct hash without re-resolving peers.
127#[derive(Default)]
128pub(crate) struct PeerCache {
129    /// user_id → access_hash
130    pub(crate) users:    HashMap<i64, i64>,
131    /// channel_id → access_hash
132    pub(crate) channels: HashMap<i64, i64>,
133}
134
135impl PeerCache {
136    fn cache_user(&mut self, user: &tl::enums::User) {
137        if let tl::enums::User::User(u) = user {
138            if let Some(hash) = u.access_hash {
139                self.users.insert(u.id, hash);
140            }
141        }
142    }
143
144    fn cache_chat(&mut self, chat: &tl::enums::Chat) {
145        match chat {
146            tl::enums::Chat::Channel(c) => {
147                if let Some(hash) = c.access_hash {
148                    self.channels.insert(c.id, hash);
149                }
150            }
151            tl::enums::Chat::ChannelForbidden(c) => {
152                self.channels.insert(c.id, c.access_hash);
153            }
154            _ => {}
155        }
156    }
157
158    fn cache_users(&mut self, users: &[tl::enums::User]) {
159        for u in users { self.cache_user(u); }
160    }
161
162    fn cache_chats(&mut self, chats: &[tl::enums::Chat]) {
163        for c in chats { self.cache_chat(c); }
164    }
165
166    fn user_input_peer(&self, user_id: i64) -> tl::enums::InputPeer {
167        if user_id == 0 {
168            return tl::enums::InputPeer::PeerSelf;
169        }
170        let hash = self.users.get(&user_id).copied().unwrap_or(0);
171        tl::enums::InputPeer::User(tl::types::InputPeerUser { user_id, access_hash: hash })
172    }
173
174    fn channel_input_peer(&self, channel_id: i64) -> tl::enums::InputPeer {
175        let hash = self.channels.get(&channel_id).copied().unwrap_or(0);
176        tl::enums::InputPeer::Channel(tl::types::InputPeerChannel { channel_id, access_hash: hash })
177    }
178
179    fn peer_to_input(&self, peer: &tl::enums::Peer) -> tl::enums::InputPeer {
180        match peer {
181            tl::enums::Peer::User(u) => self.user_input_peer(u.user_id),
182            tl::enums::Peer::Chat(c) => tl::enums::InputPeer::Chat(
183                tl::types::InputPeerChat { chat_id: c.chat_id }
184            ),
185            tl::enums::Peer::Channel(c) => self.channel_input_peer(c.channel_id),
186        }
187    }
188}
189
190// ─── InputMessage builder ─────────────────────────────────────────────────────
191
192/// Builder for composing outgoing messages.
193///
194/// ```rust,no_run
195/// use layer_client::InputMessage;
196///
197/// let msg = InputMessage::text("Hello, *world*!")
198///     .silent(true)
199///     .reply_to(Some(42));
200/// ```
201#[derive(Clone, Default)]
202pub struct InputMessage {
203    pub text:         String,
204    pub reply_to:     Option<i32>,
205    pub silent:       bool,
206    pub background:   bool,
207    pub clear_draft:  bool,
208    pub no_webpage:   bool,
209    pub entities:     Option<Vec<tl::enums::MessageEntity>>,
210    pub reply_markup: Option<tl::enums::ReplyMarkup>,
211    pub schedule_date: Option<i32>,
212}
213
214impl InputMessage {
215    /// Create a message with the given text.
216    pub fn text(text: impl Into<String>) -> Self {
217        Self { text: text.into(), ..Default::default() }
218    }
219
220    /// Set the message text.
221    pub fn set_text(mut self, text: impl Into<String>) -> Self {
222        self.text = text.into(); self
223    }
224
225    /// Reply to a specific message ID.
226    pub fn reply_to(mut self, id: Option<i32>) -> Self {
227        self.reply_to = id; self
228    }
229
230    /// Send silently (no notification sound).
231    pub fn silent(mut self, v: bool) -> Self {
232        self.silent = v; self
233    }
234
235    /// Send in background.
236    pub fn background(mut self, v: bool) -> Self {
237        self.background = v; self
238    }
239
240    /// Clear the draft after sending.
241    pub fn clear_draft(mut self, v: bool) -> Self {
242        self.clear_draft = v; self
243    }
244
245    /// Disable link preview.
246    pub fn no_webpage(mut self, v: bool) -> Self {
247        self.no_webpage = v; self
248    }
249
250    /// Attach formatting entities (bold, italic, code, links, etc).
251    pub fn entities(mut self, e: Vec<tl::enums::MessageEntity>) -> Self {
252        self.entities = Some(e); self
253    }
254
255    /// Attach a reply markup (inline or reply keyboard).
256    pub fn reply_markup(mut self, rm: tl::enums::ReplyMarkup) -> Self {
257        self.reply_markup = Some(rm); self
258    }
259
260    /// Shorthand for attaching an [`crate::keyboard::InlineKeyboard`].
261    ///
262    /// ```rust,no_run
263    /// use layer_client::{InputMessage, keyboard::{InlineKeyboard, Button}};
264    ///
265    /// let msg = InputMessage::text("Pick one:")
266    ///     .keyboard(InlineKeyboard::new()
267    ///         .row([Button::callback("A", b"a"), Button::callback("B", b"b")]));
268    /// ```
269    pub fn keyboard(mut self, kb: impl Into<tl::enums::ReplyMarkup>) -> Self {
270        self.reply_markup = Some(kb.into()); self
271    }
272
273    /// Schedule the message for a future Unix timestamp.
274    pub fn schedule_date(mut self, ts: Option<i32>) -> Self {
275        self.schedule_date = ts; self
276    }
277
278    fn reply_header(&self) -> Option<tl::enums::InputReplyTo> {
279        self.reply_to.map(|id| {
280            tl::enums::InputReplyTo::Message(
281                tl::types::InputReplyToMessage {
282                    reply_to_msg_id: id,
283                    top_msg_id: None,
284                    reply_to_peer_id: None,
285                    quote_text: None,
286                    quote_entities: None,
287                    quote_offset: None,
288                    monoforum_peer_id: None,
289                    todo_item_id: None,
290                }
291            )
292        })
293    }
294}
295
296impl From<&str> for InputMessage {
297    fn from(s: &str) -> Self { Self::text(s) }
298}
299
300impl From<String> for InputMessage {
301    fn from(s: String) -> Self { Self::text(s) }
302}
303
304// ─── TransportKind ────────────────────────────────────────────────────────────
305
306/// Which MTProto transport framing to use for all connections.
307///
308/// | Variant | Init bytes | Notes |
309/// |---------|-----------|-------|
310/// | `Abridged` | `0xef` | Default, smallest overhead |
311/// | `Intermediate` | `0xeeeeeeee` | Better proxy compat |
312/// | `Full` | none | Adds seqno + CRC32 |
313/// | `Obfuscated` | random 64B | Bypasses DPI / MTProxy |
314#[derive(Clone, Debug, Default)]
315pub enum TransportKind {
316    /// MTProto [Abridged] transport — length prefix is 1 or 4 bytes.
317    ///
318    /// [Abridged]: https://core.telegram.org/mtproto/mtproto-transports#abridged
319    #[default]
320    Abridged,
321    /// MTProto [Intermediate] transport — 4-byte LE length prefix.
322    ///
323    /// [Intermediate]: https://core.telegram.org/mtproto/mtproto-transports#intermediate
324    Intermediate,
325    /// MTProto [Full] transport — 4-byte length + seqno + CRC32.
326    ///
327    /// [Full]: https://core.telegram.org/mtproto/mtproto-transports#full
328    Full,
329    /// [Obfuscated2] transport — XOR stream cipher over Abridged framing.
330    /// Required for MTProxy and networks with deep-packet inspection.
331    ///
332    /// `secret` is the 16-byte proxy secret, or `None` for keyless obfuscation.
333    ///
334    /// [Obfuscated2]: https://core.telegram.org/mtproto/mtproto-transports#obfuscated-2
335    Obfuscated { secret: Option<[u8; 16]> },
336}
337
338// ─── Config ───────────────────────────────────────────────────────────────────
339
340/// A token that can be used to gracefully shut down a [`Client`].
341///
342/// Obtained from [`Client::connect`] — call [`ShutdownToken::cancel`] to begin
343/// graceful shutdown. All pending requests will finish and the reader task will
344/// exit cleanly.
345///
346/// # Example
347/// ```rust,no_run
348/// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
349/// use layer_client::{Client, Config, ShutdownToken};
350///
351/// let (client, shutdown) = Client::connect(Config::default()).await?;
352///
353/// // In a signal handler or background task:
354/// // shutdown.cancel();
355/// # Ok(()) }
356/// ```
357pub type ShutdownToken = CancellationToken;
358
359/// Configuration for [`Client::connect`].
360#[derive(Clone)]
361pub struct Config {
362    pub api_id:         i32,
363    pub api_hash:       String,
364    pub dc_addr:        Option<String>,
365    pub retry_policy:   Arc<dyn RetryPolicy>,
366    /// Optional SOCKS5 proxy — every Telegram connection is tunnelled through it.
367    pub socks5:         Option<crate::socks5::Socks5Config>,
368    /// Allow IPv6 DC addresses when populating the DC table (default: false).
369    pub allow_ipv6:     bool,
370    /// Which MTProto transport framing to use (default: Abridged).
371    pub transport:      TransportKind,
372    /// Session persistence backend (default: binary file `"layer.session"`).
373    pub session_backend: Arc<dyn crate::session_backend::SessionBackend>,
374    /// If `true`, replay missed updates via `updates.getDifference` immediately
375    /// after connecting. Mirrors grammers' `UpdatesConfiguration { catch_up: true }`.
376    /// Default: `false`.
377    pub catch_up:       bool,
378}
379
380impl Default for Config {
381    fn default() -> Self {
382        Self {
383            api_id:          0,
384            api_hash:        String::new(),
385            dc_addr:         None,
386            retry_policy:    Arc::new(AutoSleep::default()),
387            socks5:          None,
388            allow_ipv6:      false,
389            transport:       TransportKind::Abridged,
390            session_backend: Arc::new(crate::session_backend::BinaryFileBackend::new("layer.session")),
391            catch_up:        false,
392        }
393    }
394}
395
396// ─── UpdateStream ─────────────────────────────────────────────────────────────
397
398/// Asynchronous stream of [`Update`]s.
399pub struct UpdateStream {
400    rx: mpsc::UnboundedReceiver<update::Update>,
401}
402
403impl UpdateStream {
404    /// Wait for the next update. Returns `None` when the client has disconnected.
405    pub async fn next(&mut self) -> Option<update::Update> {
406        self.rx.recv().await
407    }
408}
409
410// ─── Dialog ───────────────────────────────────────────────────────────────────
411
412/// A Telegram dialog (chat, user, channel).
413#[derive(Debug, Clone)]
414pub struct Dialog {
415    pub raw:     tl::enums::Dialog,
416    pub message: Option<tl::enums::Message>,
417    pub entity:  Option<tl::enums::User>,
418    pub chat:    Option<tl::enums::Chat>,
419}
420
421impl Dialog {
422    /// The dialog's display title.
423    pub fn title(&self) -> String {
424        if let Some(tl::enums::User::User(u)) = &self.entity {
425            let first = u.first_name.as_deref().unwrap_or("");
426            let last  = u.last_name.as_deref().unwrap_or("");
427            let name  = format!("{first} {last}").trim().to_string();
428            if !name.is_empty() { return name; }
429        }
430        if let Some(chat) = &self.chat {
431            return match chat {
432                tl::enums::Chat::Chat(c)         => c.title.clone(),
433                tl::enums::Chat::Forbidden(c) => c.title.clone(),
434                tl::enums::Chat::Channel(c)      => c.title.clone(),
435                tl::enums::Chat::ChannelForbidden(c) => c.title.clone(),
436                tl::enums::Chat::Empty(_)        => "(empty)".into(),
437            };
438        }
439        "(Unknown)".to_string()
440    }
441
442    /// Peer of this dialog.
443    pub fn peer(&self) -> Option<&tl::enums::Peer> {
444        match &self.raw {
445            tl::enums::Dialog::Dialog(d) => Some(&d.peer),
446            tl::enums::Dialog::Folder(_) => None,
447        }
448    }
449
450    /// Unread message count.
451    pub fn unread_count(&self) -> i32 {
452        match &self.raw {
453            tl::enums::Dialog::Dialog(d) => d.unread_count,
454            _ => 0,
455        }
456    }
457
458    /// ID of the top message.
459    pub fn top_message(&self) -> i32 {
460        match &self.raw {
461            tl::enums::Dialog::Dialog(d) => d.top_message,
462            _ => 0,
463        }
464    }
465}
466
467// ─── ClientInner ─────────────────────────────────────────────────────────────
468
469struct ClientInner {
470    /// Write half of the connection — holds the EncryptedSession (for packing)
471    /// and the send half of the TCP stream. The read half is owned by the
472    /// reader task started in connect().
473    writer:          Mutex<ConnectionWriter>,
474    /// Pending RPC replies, keyed by MTProto msg_id.
475    /// RPC callers insert a oneshot::Sender here before sending; the reader
476    /// task routes incoming rpc_result frames to the matching sender.
477    pending:         Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>>,
478    /// Channel used to hand a new (OwnedReadHalf, FrameKind, auth_key, session_id)
479    /// to the reader task after a reconnect.
480    reconnect_tx:    mpsc::UnboundedSender<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
481    /// Send `()` here to wake the reader's reconnect backoff loop immediately.
482    /// Used by [`Client::signal_network_restored`].
483    network_hint_tx: mpsc::UnboundedSender<()>,
484    /// Cancelled to signal graceful shutdown to the reader task.
485    #[allow(dead_code)]
486    shutdown_token:  CancellationToken,
487    /// Whether to replay missed updates via getDifference on connect.
488    #[allow(dead_code)]
489    catch_up:        bool,
490    home_dc_id:      Mutex<i32>,
491    dc_options:      Mutex<HashMap<i32, DcEntry>>,
492    pub(crate) peer_cache:    Mutex<PeerCache>,
493    pub(crate) pts_state:     Mutex<pts::PtsState>,
494    api_id:          i32,
495    api_hash:        String,
496    retry_policy:    Arc<dyn RetryPolicy>,
497    socks5:          Option<crate::socks5::Socks5Config>,
498    allow_ipv6:      bool,
499    transport:       TransportKind,
500    session_backend: Arc<dyn crate::session_backend::SessionBackend>,
501    dc_pool:         Mutex<dc_pool::DcPool>,
502    update_tx:       mpsc::UnboundedSender<update::Update>,
503}
504
505/// The main Telegram client. Cheap to clone — internally Arc-wrapped.
506#[derive(Clone)]
507pub struct Client {
508    pub(crate) inner: Arc<ClientInner>,
509    _update_rx: Arc<Mutex<mpsc::UnboundedReceiver<update::Update>>>,
510}
511
512impl Client {
513    // ── Connect ────────────────────────────────────────────────────────────
514
515    pub async fn connect(config: Config) -> Result<(Self, ShutdownToken), InvocationError> {
516        let (update_tx, update_rx) = mpsc::unbounded_channel();
517
518        // ── Load or fresh-connect ───────────────────────────────────────
519        let socks5    = config.socks5.clone();
520        let transport = config.transport.clone();
521
522        let (conn, home_dc_id, dc_opts) =
523            match config.session_backend.load()
524                .map_err(InvocationError::Io)?
525            {
526                Some(s) => {
527                    if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
528                        if let Some(key) = dc.auth_key {
529                            log::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
530                            match Connection::connect_with_key(
531                                &dc.addr, key, dc.first_salt, dc.time_offset,
532                                socks5.as_ref(), &transport,
533                            ).await {
534                                Ok(c) => {
535                                    let mut opts = session::default_dc_addresses()
536                                        .into_iter()
537                                        .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
538                                        .collect::<HashMap<_, _>>();
539                                    for d in &s.dcs { opts.insert(d.dc_id, d.clone()); }
540                                    (c, s.home_dc_id, opts)
541                                }
542                                Err(e) => {
543                                    log::warn!("[layer] Session connect failed ({e}), fresh connect …");
544                                    Self::fresh_connect(socks5.as_ref(), &transport).await?
545                                }
546                            }
547                        } else {
548                            Self::fresh_connect(socks5.as_ref(), &transport).await?
549                        }
550                    } else {
551                        Self::fresh_connect(socks5.as_ref(), &transport).await?
552                    }
553                }
554                None => Self::fresh_connect(socks5.as_ref(), &transport).await?,
555            };
556
557        // ── Build DC pool ───────────────────────────────────────────────
558        let pool = dc_pool::DcPool::new(home_dc_id, &dc_opts.values().cloned().collect::<Vec<_>>());
559
560        // Split the TCP stream immediately.
561        // The writer (write half + EncryptedSession) stays in ClientInner.
562        // The read half goes to the reader task which we spawn right now so
563        // that RPC calls during init_connection work correctly.
564        let (writer, read_half, frame_kind) = conn.into_writer();
565        let auth_key   = writer.enc.auth_key_bytes();
566        let session_id = writer.enc.session_id();
567
568        let pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>> =
569            Arc::new(Mutex::new(HashMap::new()));
570
571        // Channel the reconnect logic uses to hand a new read half to the reader task.
572        let (reconnect_tx, reconnect_rx) =
573            mpsc::unbounded_channel::<(OwnedReadHalf, FrameKind, [u8; 256], i64)>();
574
575        // Channel for external "network restored" hints — lets Android/iOS callbacks
576        // skip the reconnect backoff and attempt immediately.
577        let (network_hint_tx, network_hint_rx) = mpsc::unbounded_channel::<()>();
578
579
580        // Graceful shutdown token — cancel this to stop the reader task cleanly.
581        let shutdown_token = CancellationToken::new();
582        let catch_up = config.catch_up;
583
584        let inner = Arc::new(ClientInner {
585            writer:          Mutex::new(writer),
586            pending:         pending.clone(),
587            reconnect_tx,
588            network_hint_tx,
589            shutdown_token:  shutdown_token.clone(),
590            catch_up,
591            home_dc_id:      Mutex::new(home_dc_id),
592            dc_options:      Mutex::new(dc_opts),
593            peer_cache:      Mutex::new(PeerCache::default()),
594            pts_state:       Mutex::new(pts::PtsState::default()),
595            api_id:          config.api_id,
596            api_hash:        config.api_hash,
597            retry_policy:    config.retry_policy,
598            socks5:          config.socks5,
599            allow_ipv6:      config.allow_ipv6,
600            transport:       config.transport,
601            session_backend: config.session_backend,
602            dc_pool:         Mutex::new(pool),
603            update_tx:       update_tx,
604        });
605
606        let client = Self {
607            inner,
608            _update_rx: Arc::new(Mutex::new(update_rx)),
609        };
610
611        // Spawn the reader task immediately so that RPC calls during
612        // init_connection can receive their responses.
613        {
614            let client_r = client.clone();
615            let shutdown_r = shutdown_token.clone();
616            tokio::spawn(async move {
617                client_r.run_reader_task(
618                    read_half, frame_kind, auth_key, session_id,
619                    reconnect_rx, network_hint_rx, shutdown_r,
620                ).await;
621            });
622        }
623
624        // If init_connection fails (e.g. stale auth key rejected by Telegram),
625        // do a fresh DH handshake and retry once.
626        if let Err(e) = client.init_connection().await {
627            log::warn!("[layer] init_connection failed ({e}), retrying with fresh connect …");
628
629            let socks5_r    = client.inner.socks5.clone();
630            let transport_r = client.inner.transport.clone();
631            let (new_conn, new_dc_id, new_opts) =
632                Self::fresh_connect(socks5_r.as_ref(), &transport_r).await?;
633
634            {
635                let mut dc_guard = client.inner.home_dc_id.lock().await;
636                *dc_guard = new_dc_id;
637            }
638            {
639                let mut opts_guard = client.inner.dc_options.lock().await;
640                *opts_guard = new_opts;
641            }
642
643            // Replace writer and hand new read half to reader task
644            let (new_writer, new_read, new_fk) = new_conn.into_writer();
645            let new_ak = new_writer.enc.auth_key_bytes();
646            let new_sid = new_writer.enc.session_id();
647            *client.inner.writer.lock().await = new_writer;
648            let _ = client.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
649
650            client.init_connection().await?;
651        }
652
653        let _ = client.sync_pts_state().await;
654
655        // If catch_up is enabled, replay any missed updates immediately.
656        if catch_up {
657            let c = client.clone();
658            let utx = client.inner.update_tx.clone();
659            tokio::spawn(async move {
660                if let Ok(missed) = c.get_difference().await {
661                    for u in missed { let _ = utx.send(u); }
662                }
663            });
664        }
665
666        Ok((client, shutdown_token))
667    }
668
669    async fn fresh_connect(
670        socks5:    Option<&crate::socks5::Socks5Config>,
671        transport: &TransportKind,
672    ) -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
673        log::info!("[layer] Fresh connect to DC2 …");
674        let conn = Connection::connect_raw("149.154.167.51:443", socks5, transport).await?;
675        let opts = session::default_dc_addresses()
676            .into_iter()
677            .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
678            .collect();
679        Ok((conn, 2, opts))
680    }
681
682    // ── Session ────────────────────────────────────────────────────────────
683
684    pub async fn save_session(&self) -> Result<(), InvocationError> {
685        let writer_guard = self.inner.writer.lock().await;
686        let home_dc_id   = *self.inner.home_dc_id.lock().await;
687        let dc_options   = self.inner.dc_options.lock().await;
688
689        let mut dcs: Vec<DcEntry> = dc_options.values().map(|e| DcEntry {
690            dc_id:       e.dc_id,
691            addr:        e.addr.clone(),
692            auth_key:    if e.dc_id == home_dc_id { Some(writer_guard.auth_key_bytes()) } else { e.auth_key },
693            first_salt:  if e.dc_id == home_dc_id { writer_guard.first_salt() } else { e.first_salt },
694            time_offset: if e.dc_id == home_dc_id { writer_guard.time_offset() } else { e.time_offset },
695        }).collect();
696        // Collect auth keys from worker DCs in the pool
697        self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
698
699        self.inner.session_backend
700            .save(&PersistedSession { home_dc_id, dcs })
701            .map_err(InvocationError::Io)?;
702        log::info!("[layer] Session saved ✓");
703        Ok(())
704    }
705
706    // ── Auth ───────────────────────────────────────────────────────────────
707
708    /// Returns `true` if the client is already authorized.
709    pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
710        match self.invoke(&tl::functions::updates::GetState {}).await {
711            Ok(_)  => Ok(true),
712            Err(e) if e.is("AUTH_KEY_UNREGISTERED")
713                   || matches!(&e, InvocationError::Rpc(r) if r.code == 401) => Ok(false),
714            Err(e) => Err(e),
715        }
716    }
717
718    /// Sign in as a bot.
719    pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
720        let req = tl::functions::auth::ImportBotAuthorization {
721            flags: 0, api_id: self.inner.api_id,
722            api_hash: self.inner.api_hash.clone(),
723            bot_auth_token: token.to_string(),
724        };
725
726        let result = match self.invoke(&req).await {
727            Ok(x) => x,
728            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
729                let dc_id = r.value.unwrap_or(2) as i32;
730                self.migrate_to(dc_id).await?;
731                self.invoke(&req).await?
732            }
733            Err(e) => return Err(e),
734        };
735
736        let name = match result {
737            tl::enums::auth::Authorization::Authorization(a) => {
738                self.cache_user(&a.user).await;
739                Self::extract_user_name(&a.user)
740            }
741            tl::enums::auth::Authorization::SignUpRequired(_) => {
742                panic!("unexpected SignUpRequired during bot sign-in")
743            }
744        };
745        log::info!("[layer] Bot signed in ✓  ({name})");
746        Ok(name)
747    }
748
749    /// Request a login code for a user account.
750    pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
751        use tl::enums::auth::SentCode;
752
753        let req = self.make_send_code_req(phone);
754        let body = match self.rpc_call_raw(&req).await {
755            Ok(b) => b,
756            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
757                let dc_id = r.value.unwrap_or(2) as i32;
758                self.migrate_to(dc_id).await?;
759                self.rpc_call_raw(&req).await?
760            }
761            Err(e) => return Err(e),
762        };
763
764        let mut cur = Cursor::from_slice(&body);
765        let hash = match tl::enums::auth::SentCode::deserialize(&mut cur)? {
766            SentCode::SentCode(s) => s.phone_code_hash,
767            SentCode::Success(_)  => return Err(InvocationError::Deserialize("unexpected Success".into())),
768            SentCode::PaymentRequired(_) => return Err(InvocationError::Deserialize("payment required to send code".into())),
769        };
770        log::info!("[layer] Login code sent");
771        Ok(LoginToken { phone: phone.to_string(), phone_code_hash: hash })
772    }
773
774    /// Complete sign-in with the code sent to the phone.
775    pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
776        let req = tl::functions::auth::SignIn {
777            phone_number:    token.phone.clone(),
778            phone_code_hash: token.phone_code_hash.clone(),
779            phone_code:      Some(code.trim().to_string()),
780            email_verification: None,
781        };
782
783        let body = match self.rpc_call_raw(&req).await {
784            Ok(b) => b,
785            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
786                let dc_id = r.value.unwrap_or(2) as i32;
787                self.migrate_to(dc_id).await.map_err(SignInError::Other)?;
788                self.rpc_call_raw(&req).await.map_err(SignInError::Other)?
789            }
790            Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
791                let t = self.get_password_info().await.map_err(SignInError::Other)?;
792                return Err(SignInError::PasswordRequired(t));
793            }
794            Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
795            Err(e) => return Err(SignInError::Other(e)),
796        };
797
798        let mut cur = Cursor::from_slice(&body);
799        match tl::enums::auth::Authorization::deserialize(&mut cur)
800            .map_err(|e| SignInError::Other(e.into()))?
801        {
802            tl::enums::auth::Authorization::Authorization(a) => {
803                self.cache_user(&a.user).await;
804                let name = Self::extract_user_name(&a.user);
805                log::info!("[layer] Signed in ✓  Welcome, {name}!");
806                Ok(name)
807            }
808            tl::enums::auth::Authorization::SignUpRequired(_) => Err(SignInError::SignUpRequired),
809        }
810    }
811
812    /// Complete 2FA login.
813    pub async fn check_password(
814        &self,
815        token:    PasswordToken,
816        password: impl AsRef<[u8]>,
817    ) -> Result<String, InvocationError> {
818        let pw   = token.password;
819        let algo = pw.current_algo.ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
820        let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
821        let g_b  = pw.srp_b.ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
822        let a    = pw.secure_random;
823        let srp_id = pw.srp_id.ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
824
825        let (m1, g_a) = two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
826        let req = tl::functions::auth::CheckPassword {
827            password: tl::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
828                tl::types::InputCheckPasswordSrp {
829                    srp_id, a: g_a.to_vec(), m1: m1.to_vec(),
830                },
831            ),
832        };
833
834        let body = self.rpc_call_raw(&req).await?;
835        let mut cur = Cursor::from_slice(&body);
836        match tl::enums::auth::Authorization::deserialize(&mut cur)? {
837            tl::enums::auth::Authorization::Authorization(a) => {
838                self.cache_user(&a.user).await;
839                let name = Self::extract_user_name(&a.user);
840                log::info!("[layer] 2FA ✓  Welcome, {name}!");
841                Ok(name)
842            }
843            tl::enums::auth::Authorization::SignUpRequired(_) =>
844                Err(InvocationError::Deserialize("unexpected SignUpRequired after 2FA".into())),
845        }
846    }
847
848    /// Sign out and invalidate the current session.
849    pub async fn sign_out(&self) -> Result<bool, InvocationError> {
850        let req = tl::functions::auth::LogOut {};
851        match self.rpc_call_raw(&req).await {
852            Ok(_) => { log::info!("[layer] Signed out ✓"); Ok(true) }
853            Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
854            Err(e) => Err(e),
855        }
856    }
857
858    // ── Get self ───────────────────────────────────────────────────────────
859
860    /// Fetch information about the logged-in user.
861    pub async fn get_me(&self) -> Result<tl::types::User, InvocationError> {
862        let req = tl::functions::users::GetUsers {
863            id: vec![tl::enums::InputUser::UserSelf],
864        };
865        let body    = self.rpc_call_raw(&req).await?;
866        let mut cur = Cursor::from_slice(&body);
867        let users   = Vec::<tl::enums::User>::deserialize(&mut cur)?;
868        self.cache_users_slice(&users).await;
869        users.into_iter().find_map(|u| match u {
870            tl::enums::User::User(u) => Some(u),
871            _ => None,
872        }).ok_or_else(|| InvocationError::Deserialize("getUsers returned no user".into()))
873    }
874
875    // ── Updates ────────────────────────────────────────────────────────────
876
877    /// Return an [`UpdateStream`] that yields incoming [`Update`]s.
878    ///
879    /// The reader task (started inside `connect()`) sends all updates to
880    /// `inner.update_tx`. This method proxies those updates into a fresh
881    /// caller-owned channel — typically called once per bot/app loop.
882    pub fn stream_updates(&self) -> UpdateStream {
883        let (caller_tx, rx) = mpsc::unbounded_channel::<update::Update>();
884        // The internal channel is: reader_task → update_tx → _update_rx
885        // We forward _update_rx → caller_tx so the caller gets an owned stream.
886        let internal_rx = self._update_rx.clone();
887        tokio::spawn(async move {
888            // Lock once and hold — this is the sole consumer of the internal channel.
889            let mut guard = internal_rx.lock().await;
890            while let Some(upd) = guard.recv().await {
891                if caller_tx.send(upd).is_err() { break; }
892            }
893        });
894        UpdateStream { rx }
895    }
896
897    // ── Network hint ───────────────────────────────────────────────────────
898
899    /// Signal that network connectivity has been restored.
900    ///
901    /// Call this from platform network-change callbacks — Android's
902    /// `ConnectivityManager`, iOS `NWPathMonitor`, or any other OS hook —
903    /// to make the client attempt an immediate reconnect instead of waiting
904    /// for the exponential backoff timer to expire.
905    ///
906    /// Safe to call at any time: if the connection is healthy the hint is
907    /// silently ignored by the reader task; if it is in a backoff loop it
908    /// wakes up and tries again right away.
909    pub fn signal_network_restored(&self) {
910        let _ = self.inner.network_hint_tx.send(());
911    }
912
913    // ── Reader task ────────────────────────────────────────────────────────
914    // Decrypts frames without holding any lock, then routes:
915    //   rpc_result  → pending map (oneshot to waiting RPC caller)
916    //   update      → update_tx  (delivered to stream_updates consumers)
917    //   bad_server_salt → updates writer salt
918    //
919    // On error: drains pending with Io errors (so AutoSleep retries callers),
920    // then loops with exponential backoff until reconnect succeeds.
921    // network_hint_rx lets external callers (Android/iOS) skip the backoff.
922    //
923    // DC migration / reconnect: the new read half arrives via new_conn_rx.
924    // The select! between recv_frame_owned and new_conn_rx.recv() ensures we
925    // switch to the new connection immediately, without waiting for the next
926    // frame on the old (now stale) connection.
927
928    // ── Reader task supervisor ────────────────────────────────────────────────
929    //
930    // run_reader_task is the outer supervisor. It wraps reader_loop in a
931    // restart loop so that if reader_loop ever exits for any reason other than
932    // a clean shutdown request, it is automatically reconnected and restarted.
933    //
934    // This mirrors what Telegram Desktop does: the network thread is considered
935    // infrastructure and is never allowed to die permanently.
936    //
937    // Restart sequence on unexpected exit:
938    //   1. Drain all pending RPCs with ConnectionReset so callers unblock.
939    //   2. Exponential-backoff reconnect loop (500 ms → 30 s cap) until TCP
940    //      succeeds, respecting the shutdown token at every sleep point.
941    //   3. Spawn init_connection in a background task (same deadlock-safe
942    //      pattern as do_reconnect_loop) and pass the oneshot receiver as the
943    //      initial_init_rx to the restarted reader_loop.
944    //   4. reader_loop picks up init_rx immediately on its first iteration and
945    //      handles success/failure exactly like a mid-session reconnect.
946    async fn run_reader_task(
947        &self,
948        read_half:   OwnedReadHalf,
949        frame_kind:  FrameKind,
950        auth_key:    [u8; 256],
951        session_id:  i64,
952        mut new_conn_rx:      mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
953        mut network_hint_rx:  mpsc::UnboundedReceiver<()>,
954        shutdown_token:       CancellationToken,
955    ) {
956        let mut rh  = read_half;
957        let mut fk  = frame_kind;
958        let mut ak  = auth_key;
959        let mut sid = session_id;
960        // On first start no init is needed (connect() already called it).
961        // On restarts we pass the spawned init task so reader_loop handles it.
962        let mut restart_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = None;
963        let mut restart_count: u32 = 0;
964
965        loop {
966            tokio::select! {
967                // ── Clean shutdown ────────────────────────────────────────────
968                _ = shutdown_token.cancelled() => {
969                    log::info!("[layer] Reader task: shutdown requested, exiting cleanly.");
970                    let mut pending = self.inner.pending.lock().await;
971                    for (_, tx) in pending.drain() {
972                        let _ = tx.send(Err(InvocationError::Dropped));
973                    }
974                    return;
975                }
976
977                // ── reader_loop ───────────────────────────────────────────────
978                _ = self.reader_loop(
979                        rh, fk, ak, sid,
980                        restart_init_rx.take(),
981                        &mut new_conn_rx, &mut network_hint_rx,
982                    ) => {}
983            }
984
985            // If we reach here, reader_loop returned without a shutdown signal.
986            // This should never happen in normal operation — treat it as a fault.
987            if shutdown_token.is_cancelled() {
988                log::info!("[layer] Reader task: exiting after loop (shutdown).");
989                return;
990            }
991
992            restart_count += 1;
993            log::error!(
994                "[layer] Reader loop exited unexpectedly (restart #{restart_count}) —                  supervisor reconnecting …"
995            );
996
997            // Step 1: drain all pending RPCs so callers don't hang.
998            {
999                let mut pending = self.inner.pending.lock().await;
1000                for (_, tx) in pending.drain() {
1001                    let _ = tx.send(Err(InvocationError::Io(std::io::Error::new(
1002                        std::io::ErrorKind::ConnectionReset,
1003                        "reader task restarted",
1004                    ))));
1005                }
1006            }
1007
1008            // Step 2: reconnect with exponential backoff, honouring shutdown.
1009            let mut delay_ms = RECONNECT_BASE_MS;
1010            let new_conn = loop {
1011                log::info!("[layer] Supervisor: reconnecting in {delay_ms} ms …");
1012                tokio::select! {
1013                    _ = shutdown_token.cancelled() => {
1014                        log::info!("[layer] Supervisor: shutdown during reconnect, exiting.");
1015                        return;
1016                    }
1017                    _ = sleep(Duration::from_millis(delay_ms)) => {}
1018                }
1019
1020                // do_reconnect ignores both params (_old_auth_key, _old_frame_kind) —
1021                // it re-reads everything from ClientInner. rh/fk/ak/sid were moved
1022                // into reader_loop, so we pass dummies here; fresh values come back
1023                // from the Ok result and replace them below.
1024                let dummy_ak = [0u8; 256];
1025                let dummy_fk = FrameKind::Abridged;
1026                match self.do_reconnect(&dummy_ak, &dummy_fk).await {
1027                    Ok(conn) => break conn,
1028                    Err(e)   => {
1029                        log::warn!("[layer] Supervisor: reconnect failed ({e})");
1030                        let next = (delay_ms * 2).min(RECONNECT_MAX_SECS * 1_000);
1031                        delay_ms = jitter_delay(next).as_millis() as u64;
1032                    }
1033                }
1034            };
1035
1036            let (new_rh, new_fk, new_ak, new_sid) = new_conn;
1037            rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
1038
1039            // Step 3: spawn init_connection (cannot await inline — reader must
1040            // be running to route the RPC response, or we deadlock).
1041            let (init_tx, init_rx) = oneshot::channel();
1042            let c   = self.clone();
1043            let utx = self.inner.update_tx.clone();
1044            tokio::spawn(async move {
1045                // Respect FLOOD_WAIT (same as do_reconnect_loop).
1046                let result = loop {
1047                    match c.init_connection().await {
1048                        Ok(()) => break Ok(()),
1049                        Err(InvocationError::Rpc(ref r))
1050                            if r.flood_wait_seconds().is_some() =>
1051                        {
1052                            let secs = r.flood_wait_seconds().unwrap();
1053                            log::warn!(
1054                                "[layer] Supervisor init_connection FLOOD_WAIT_{secs} — waiting"
1055                            );
1056                            sleep(Duration::from_secs(secs + 1)).await;
1057                        }
1058                        Err(e) => break Err(e),
1059                    }
1060                };
1061                if result.is_ok() {
1062                    if let Ok(missed) = c.get_difference().await {
1063                        for u in missed { let _ = utx.send(u); }
1064                    }
1065                }
1066                let _ = init_tx.send(result);
1067            });
1068            restart_init_rx = Some(init_rx);
1069
1070            log::info!("[layer] Supervisor: restarting reader loop (restart #{restart_count}) …");
1071            // Loop back → reader_loop restarts with the fresh connection.
1072        }
1073    }
1074
1075    async fn reader_loop(
1076        &self,
1077        mut rh:          OwnedReadHalf,
1078        mut fk:          FrameKind,
1079        mut ak:          [u8; 256],
1080        mut sid:         i64,
1081        // When Some, the supervisor has already spawned init_connection on our
1082        // behalf (supervisor restart path). On first start this is None.
1083        initial_init_rx:  Option<oneshot::Receiver<Result<(), InvocationError>>>,
1084        new_conn_rx:      &mut mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1085        network_hint_rx:  &mut mpsc::UnboundedReceiver<()>,
1086    ) {
1087        // Tracks an in-flight init_connection task spawned after every reconnect.
1088        // The reader loop must keep routing frames while we wait so the RPC
1089        // response can reach its oneshot sender (otherwise → 30 s self-deadlock).
1090        // If init fails we re-enter the reconnect loop immediately.
1091        let mut init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = initial_init_rx;
1092        // How many consecutive init_connection failures have occurred on the
1093        // *current* auth key.  We retry with the same key up to 2 times before
1094        // assuming the key is stale and clearing it for a fresh DH handshake.
1095        // This prevents a transient 30 s timeout from nuking a valid session.
1096        let mut init_fail_count: u32 = 0;
1097
1098        loop {
1099            tokio::select! {
1100                // ── Normal frame (or application-level keepalive timeout) ─────
1101                outcome = recv_frame_with_keepalive(&mut rh, &fk, self, &ak) => {
1102                    match outcome {
1103                        FrameOutcome::Frame(mut raw) => {
1104                            let msg = match EncryptedSession::decrypt_frame(&ak, sid, &mut raw) {
1105                                Ok(m)  => m,
1106                                Err(e) => { log::warn!("[layer] Decrypt error: {e:?}"); continue; }
1107                            };
1108                            if msg.salt != 0 {
1109                                self.inner.writer.lock().await.enc.salt = msg.salt;
1110                            }
1111                            self.route_frame(msg.body).await;
1112                        }
1113
1114                        FrameOutcome::Error(e) => {
1115                            log::warn!("[layer] Reader: connection error: {e}");
1116                            drop(init_rx.take()); // discard any in-flight init
1117
1118                            // Fail all in-flight RPCs immediately so AutoSleep
1119                            // retries them as soon as we reconnect.
1120                            {
1121                                let mut pending = self.inner.pending.lock().await;
1122                                let msg = e.to_string();
1123                                for (_, tx) in pending.drain() {
1124                                    let _ = tx.send(Err(InvocationError::Io(
1125                                        std::io::Error::new(
1126                                            std::io::ErrorKind::ConnectionReset, msg.clone()))));
1127                                }
1128                            }
1129
1130                            match self.do_reconnect_loop(
1131                                RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
1132                                network_hint_rx,
1133                            ).await {
1134                                Some(rx) => { init_rx = Some(rx); }
1135                                None     => return, // shutdown requested
1136                            }
1137                        }
1138
1139                        FrameOutcome::Keepalive => {} // ping sent successfully; loop
1140                    }
1141                }
1142
1143                // ── DC migration / deliberate reconnect ──────────────────────
1144                maybe = new_conn_rx.recv() => {
1145                    if let Some((new_rh, new_fk, new_ak, new_sid)) = maybe {
1146                        rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
1147                        log::info!("[layer] Reader: switched to new connection.");
1148                    } else {
1149                        break; // reconnect_tx dropped → client is shutting down
1150                    }
1151                }
1152
1153
1154                // ── init_connection result (polled only when Some) ────────────
1155                init_result = async { init_rx.as_mut().unwrap().await }, if init_rx.is_some() => {
1156                    init_rx = None;
1157                    match init_result {
1158                        Ok(Ok(())) => {
1159                            init_fail_count = 0;
1160                            // Fully initialised. Persist updated DC/session state.
1161                            // Retry up to 3 times — a transient I/O error (e.g. the
1162                            // filesystem briefly unavailable on Android) should not
1163                            // cause the auth key to diverge from disk permanently.
1164                            log::info!("[layer] Reconnected to Telegram ✓ — session live, replaying missed updates …");
1165                            for attempt in 1u8..=3 {
1166                                match self.save_session().await {
1167                                    Ok(()) => break,
1168                                    Err(e) if attempt < 3 => {
1169                                        log::warn!(
1170                                            "[layer] save_session failed (attempt {attempt}/3): {e}"
1171                                        );
1172                                        sleep(Duration::from_millis(500)).await;
1173                                    }
1174                                    Err(e) => {
1175                                        log::error!(
1176                                            "[layer] save_session permanently failed after 3 attempts: {e}"
1177                                        );
1178                                    }
1179                                }
1180                            }
1181                        }
1182
1183                        Ok(Err(e)) => {
1184                            // TCP connected but init RPC failed.
1185                            //
1186                            // Grammers' approach: only treat the key as stale when
1187                            // Telegram sends a definitive "bad auth key" signal —
1188                            // a -404 transport error code.  A 30 s timeout is almost
1189                            // always a transient network issue and must NOT clear the
1190                            // session (doing so creates a brand-new session in Telegram's
1191                            // active devices list and orphans the old one).
1192                            let key_is_stale = match &e {
1193                                // Telegram's abridged-transport error code for invalid key
1194                                InvocationError::Rpc(r) if r.code == -404 => true,
1195                                // Early EOF immediately after connecting = server rejected key
1196                                InvocationError::Io(io) if io.kind() == std::io::ErrorKind::UnexpectedEof
1197                                    || io.kind() == std::io::ErrorKind::ConnectionReset => true,
1198                                // 30 s timeout → transient; keep the key and retry
1199                                _ => false,
1200                            };
1201
1202                            if key_is_stale {
1203                                log::warn!(
1204                                    "[layer] init_connection failed with definitive bad-key signal ({e}) \
1205                                     — clearing auth key for fresh DH …"
1206                                );
1207                                init_fail_count = 0;
1208                                let home_dc_id = *self.inner.home_dc_id.lock().await;
1209                                let mut opts = self.inner.dc_options.lock().await;
1210                                if let Some(entry) = opts.get_mut(&home_dc_id) {
1211                                    entry.auth_key = None;
1212                                }
1213                            } else {
1214                                init_fail_count += 1;
1215                                log::warn!(
1216                                    "[layer] init_connection failed transiently (attempt {init_fail_count}, {e}) \
1217                                     — retrying with same key …"
1218                                );
1219                            }
1220                            {
1221                                let mut pending = self.inner.pending.lock().await;
1222                                let msg = e.to_string();
1223                                for (_, tx) in pending.drain() {
1224                                    let _ = tx.send(Err(InvocationError::Io(
1225                                        std::io::Error::new(
1226                                            std::io::ErrorKind::ConnectionReset, msg.clone()))));
1227                                }
1228                            }
1229                            match self.do_reconnect_loop(
1230                                0, &mut rh, &mut fk, &mut ak, &mut sid, network_hint_rx,
1231                            ).await {
1232                                Some(rx) => { init_rx = Some(rx); }
1233                                None     => return,
1234                            }
1235                        }
1236
1237                        Err(_) => {
1238                            // init task was dropped (shouldn't normally happen).
1239                            log::warn!("[layer] init_connection task dropped unexpectedly, reconnecting …");
1240                            match self.do_reconnect_loop(
1241                                RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
1242                                network_hint_rx,
1243                            ).await {
1244                                Some(rx) => { init_rx = Some(rx); }
1245                                None     => return,
1246                            }
1247                        }
1248                    }
1249                }
1250            }
1251        }
1252    }
1253
1254    /// Route a decrypted MTProto frame body to either a pending RPC caller or update_tx.
1255    async fn route_frame(&self, body: Vec<u8>) {
1256        if body.len() < 4 { return; }
1257        let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
1258
1259        match cid {
1260            ID_RPC_RESULT => {
1261                // body[4..12] = req_msg_id of the request this is answering
1262                if body.len() < 12 { return; }
1263                let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1264                let inner = body[12..].to_vec();
1265                // Recursively unwrap the inner payload (handles gzip, containers, etc.)
1266                let result = unwrap_envelope(inner);
1267                if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
1268                    let to_send = match result {
1269                        Ok(EnvelopeResult::Payload(p)) => Ok(p),
1270                        Ok(EnvelopeResult::Updates(us)) => {
1271                            // Write RPCs return Updates — forward them and signal success
1272                            for u in us { let _ = self.inner.update_tx.send(u); }
1273                            Ok(vec![])
1274                        }
1275                        Ok(EnvelopeResult::None) => Ok(vec![]),
1276                        Err(e) => Err(e),
1277                    };
1278                    let _ = tx.send(to_send);
1279                }
1280            }
1281            // ID_RPC_ERROR only appears as the INNER body of ID_RPC_RESULT.
1282            // At top level it has no req_msg_id — ignore it here (the ID_RPC_RESULT
1283            // handler above uses unwrap_envelope which correctly returns Err for it).
1284            ID_RPC_ERROR => {
1285                log::warn!("[layer] Unexpected top-level rpc_error (no pending target)");
1286            }
1287            ID_MSG_CONTAINER => {
1288                // Container of multiple inner messages — recurse into each
1289                if body.len() < 8 { return; }
1290                let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
1291                let mut pos = 8usize;
1292                for _ in 0..count {
1293                    if pos + 16 > body.len() { break; }
1294                    let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
1295                    pos += 16;
1296                    if pos + inner_len > body.len() { break; }
1297                    let inner = body[pos..pos + inner_len].to_vec();
1298                    pos += inner_len;
1299                    Box::pin(self.route_frame(inner)).await;
1300                }
1301            }
1302            ID_GZIP_PACKED => {
1303                let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
1304                if let Ok(inflated) = gz_inflate(&bytes) {
1305                    Box::pin(self.route_frame(inflated)).await;
1306                }
1307            }
1308            ID_BAD_SERVER_SALT => {
1309                // Server is telling us to use a different salt — update writer enc
1310                if body.len() >= 16 {
1311                    let new_salt = i64::from_le_bytes(body[8..16].try_into().unwrap());
1312                    self.inner.writer.lock().await.enc.salt = new_salt;
1313                }
1314            }
1315            ID_PONG => {
1316                // Bare Pong is the server's reply to our Ping — NOT wrapped in rpc_result.
1317                // pong#347773c5 msg_id:long ping_id:long
1318                //   body[4..12] = msg_id of the original Ping → key in pending map
1319                if body.len() >= 20 {
1320                    let ping_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1321                    if let Some(tx) = self.inner.pending.lock().await.remove(&ping_msg_id) {
1322                        let _ = tx.send(Ok(body));
1323                    }
1324                }
1325            }
1326            ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
1327            | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
1328            | ID_UPDATES_TOO_LONG => {
1329                for u in update::parse_updates(&body) {
1330                    let _ = self.inner.update_tx.send(u);
1331                }
1332            }
1333            // Silently acknowledged service messages — pong, acks, etc.
1334            _ => {}
1335        }
1336    }
1337
1338    /// Loops with exponential backoff until a TCP+DH reconnect succeeds, then
1339    /// spawns `init_connection` in a background task and returns a oneshot
1340    /// receiver for its result.
1341    ///
1342    /// - `initial_delay_ms = RECONNECT_BASE_MS` for a fresh disconnect.
1343    /// - `initial_delay_ms = 0` when TCP already worked but init failed — we
1344    ///   want to retry init immediately rather than waiting another full backoff.
1345    ///
1346    /// Returns `None` if the shutdown token fires (caller should exit).
1347    async fn do_reconnect_loop(
1348        &self,
1349        initial_delay_ms: u64,
1350        rh:  &mut OwnedReadHalf,
1351        fk:  &mut FrameKind,
1352        ak:  &mut [u8; 256],
1353        sid: &mut i64,
1354        network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
1355    ) -> Option<oneshot::Receiver<Result<(), InvocationError>>> {
1356        let mut delay_ms = if initial_delay_ms == 0 {
1357            // Caller explicitly requests an immediate first attempt (e.g. init
1358            // failed but TCP is up — no reason to wait before the next try).
1359            0
1360        } else {
1361            initial_delay_ms.max(RECONNECT_BASE_MS)
1362        };
1363        loop {
1364            log::info!("[layer] Reconnecting in {delay_ms} ms …");
1365            tokio::select! {
1366                _ = sleep(Duration::from_millis(delay_ms)) => {}
1367                hint = network_hint_rx.recv() => {
1368                    if hint.is_none() { return None; } // shutdown
1369                    log::info!("[layer] Network hint → skipping backoff, reconnecting now");
1370                }
1371            }
1372
1373            match self.do_reconnect(ak, fk).await {
1374                Ok((new_rh, new_fk, new_ak, new_sid)) => {
1375                    *rh = new_rh; *fk = new_fk; *ak = new_ak; *sid = new_sid;
1376                    log::info!("[layer] TCP reconnected ✓ — initialising session …");
1377
1378                    // Spawn init_connection. MUST NOT be awaited inline — the
1379                    // reader loop must resume so it can route the RPC response.
1380                    // We give back a oneshot so the reader can act on failure.
1381                    let (init_tx, init_rx) = oneshot::channel();
1382                    let c   = self.clone();
1383                    let utx = self.inner.update_tx.clone();
1384                    tokio::spawn(async move {
1385                        // Respect FLOOD_WAIT before sending the result back.
1386                        // Without this, a FLOOD_WAIT from Telegram during init
1387                        // would immediately re-trigger another reconnect attempt,
1388                        // which would itself hit FLOOD_WAIT — a ban spiral.
1389                        let result = loop {
1390                            match c.init_connection().await {
1391                                Ok(()) => break Ok(()),
1392                                Err(InvocationError::Rpc(ref r))
1393                                    if r.flood_wait_seconds().is_some() =>
1394                                {
1395                                    let secs = r.flood_wait_seconds().unwrap();
1396                                    log::warn!(
1397                                        "[layer] init_connection FLOOD_WAIT_{secs} —                                          waiting before retry"
1398                                    );
1399                                    sleep(Duration::from_secs(secs + 1)).await;
1400                                    // loop and retry init_connection
1401                                }
1402                                Err(e) => break Err(e),
1403                            }
1404                        };
1405                        if result.is_ok() {
1406                            // Replay any updates missed during the outage.
1407                            if let Ok(missed) = c.get_difference().await {
1408                                for u in missed { let _ = utx.send(u); }
1409                            }
1410                        }
1411                        let _ = init_tx.send(result);
1412                    });
1413                    return Some(init_rx);
1414                }
1415                Err(e) => {
1416                    log::warn!("[layer] Reconnect attempt failed: {e}");
1417                    // Cap at max, then apply ±20 % jitter to avoid thundering herd.
1418                    // Ensure the delay always advances by at least RECONNECT_BASE_MS
1419                    // so a 0 initial delay on the first attempt doesn't spin-loop.
1420                    let next = (delay_ms.saturating_mul(2).max(RECONNECT_BASE_MS))
1421                        .min(RECONNECT_MAX_SECS * 1_000);
1422                    delay_ms = jitter_delay(next).as_millis() as u64;
1423                }
1424            }
1425        }
1426    }
1427
1428    /// Reconnect to the home DC, replace the writer, and return the new read half.
1429    async fn do_reconnect(
1430        &self,
1431        _old_auth_key: &[u8; 256],
1432        _old_frame_kind: &FrameKind,
1433    ) -> Result<(OwnedReadHalf, FrameKind, [u8; 256], i64), InvocationError> {
1434        let home_dc_id = *self.inner.home_dc_id.lock().await;
1435        let (addr, saved_key, first_salt, time_offset) = {
1436            let opts = self.inner.dc_options.lock().await;
1437            match opts.get(&home_dc_id) {
1438                Some(e) => (e.addr.clone(), e.auth_key, e.first_salt, e.time_offset),
1439                None    => ("149.154.167.51:443".to_string(), None, 0, 0),
1440            }
1441        };
1442        let socks5    = self.inner.socks5.clone();
1443        let transport = self.inner.transport.clone();
1444
1445        let new_conn = if let Some(key) = saved_key {
1446            log::info!("[layer] Reconnecting to DC{home_dc_id} with saved key …");
1447            match Connection::connect_with_key(
1448                &addr, key, first_salt, time_offset, socks5.as_ref(), &transport,
1449            ).await {
1450                Ok(c)   => c,
1451                Err(e2) => {
1452                    log::warn!("[layer] connect_with_key failed ({e2}), fresh DH …");
1453                    Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
1454                }
1455            }
1456        } else {
1457            Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
1458        };
1459
1460        let (new_writer, new_read, new_fk) = new_conn.into_writer();
1461        let new_ak  = new_writer.enc.auth_key_bytes();
1462        let new_sid = new_writer.enc.session_id();
1463        *self.inner.writer.lock().await = new_writer;
1464
1465        // NOTE: init_connection() is intentionally NOT called here.
1466        //
1467        // do_reconnect() is always called from inside the reader loop's select!,
1468        // which means the reader task is blocked while this function runs.
1469        // init_connection() sends an RPC and awaits the response — but only the
1470        // reader task can route that response back to the pending caller.
1471        // Calling it here creates a self-deadlock that times out after 30 s.
1472        //
1473        // Instead, callers are responsible for spawning init_connection() in a
1474        // separate task AFTER the reader loop has resumed and can process frames.
1475
1476        Ok((new_read, new_fk, new_ak, new_sid))
1477    }
1478
1479    // ── Messaging ──────────────────────────────────────────────────────────
1480
1481    /// Send a text message. Use `"me"` for Saved Messages.
1482    pub async fn send_message(&self, peer: &str, text: &str) -> Result<(), InvocationError> {
1483        let p = self.resolve_peer(peer).await?;
1484        self.send_message_to_peer(p, text).await
1485    }
1486
1487    /// Send a message to an already-resolved peer (plain text shorthand).
1488    pub async fn send_message_to_peer(
1489        &self,
1490        peer: tl::enums::Peer,
1491        text: &str,
1492    ) -> Result<(), InvocationError> {
1493        self.send_message_to_peer_ex(peer, &InputMessage::text(text)).await
1494    }
1495
1496    /// Send a message with full [`InputMessage`] options.
1497    pub async fn send_message_to_peer_ex(
1498        &self,
1499        peer: tl::enums::Peer,
1500        msg:  &InputMessage,
1501    ) -> Result<(), InvocationError> {
1502        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1503        let req = tl::functions::messages::SendMessage {
1504            no_webpage:               msg.no_webpage,
1505            silent:                   msg.silent,
1506            background:               msg.background,
1507            clear_draft:              msg.clear_draft,
1508            noforwards:               false,
1509            update_stickersets_order: false,
1510            invert_media:             false,
1511            allow_paid_floodskip:     false,
1512            peer:                     input_peer,
1513            reply_to:                 msg.reply_header(),
1514            message:                  msg.text.clone(),
1515            random_id:                random_i64(),
1516            reply_markup:             msg.reply_markup.clone(),
1517            entities:                 msg.entities.clone(),
1518            schedule_date:            msg.schedule_date,
1519            schedule_repeat_period:   None,
1520            send_as:                  None,
1521            quick_reply_shortcut:     None,
1522            effect:                   None,
1523            allow_paid_stars:         None,
1524            suggested_post:           None,
1525        };
1526        self.rpc_write(&req).await
1527    }
1528
1529    /// Send directly to Saved Messages.
1530    pub async fn send_to_self(&self, text: &str) -> Result<(), InvocationError> {
1531        let req = tl::functions::messages::SendMessage {
1532            no_webpage:               false,
1533            silent:                   false,
1534            background:               false,
1535            clear_draft:              false,
1536            noforwards:               false,
1537            update_stickersets_order: false,
1538            invert_media:             false,
1539            allow_paid_floodskip:     false,
1540            peer:                     tl::enums::InputPeer::PeerSelf,
1541            reply_to:                 None,
1542            message:                  text.to_string(),
1543            random_id:                random_i64(),
1544            reply_markup:             None,
1545            entities:                 None,
1546            schedule_date:            None,
1547            schedule_repeat_period:   None,
1548            send_as:                  None,
1549            quick_reply_shortcut:     None,
1550            effect:                   None,
1551            allow_paid_stars:         None,
1552            suggested_post:           None,
1553        };
1554        self.rpc_write(&req).await
1555    }
1556
1557    /// Edit an existing message.
1558    pub async fn edit_message(
1559        &self,
1560        peer:       tl::enums::Peer,
1561        message_id: i32,
1562        new_text:   &str,
1563    ) -> Result<(), InvocationError> {
1564        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1565        let req = tl::functions::messages::EditMessage {
1566            no_webpage:    false,
1567            invert_media:  false,
1568            peer:          input_peer,
1569            id:            message_id,
1570            message:       Some(new_text.to_string()),
1571            media:         None,
1572            reply_markup:  None,
1573            entities:      None,
1574            schedule_date: None,
1575            schedule_repeat_period: None,
1576            quick_reply_shortcut_id: None,
1577        };
1578        self.rpc_write(&req).await
1579    }
1580
1581    /// Forward messages from `source` to `destination`.
1582    pub async fn forward_messages(
1583        &self,
1584        destination: tl::enums::Peer,
1585        message_ids: &[i32],
1586        source:      tl::enums::Peer,
1587    ) -> Result<(), InvocationError> {
1588        let cache = self.inner.peer_cache.lock().await;
1589        let to_peer   = cache.peer_to_input(&destination);
1590        let from_peer = cache.peer_to_input(&source);
1591        drop(cache);
1592
1593        let req = tl::functions::messages::ForwardMessages {
1594            silent:            false,
1595            background:        false,
1596            with_my_score:     false,
1597            drop_author:       false,
1598            drop_media_captions: false,
1599            noforwards:        false,
1600            from_peer:         from_peer,
1601            id:                message_ids.to_vec(),
1602            random_id:         (0..message_ids.len()).map(|_| random_i64()).collect(),
1603            to_peer:           to_peer,
1604            top_msg_id:        None,
1605            reply_to:          None,
1606            schedule_date:     None,
1607            schedule_repeat_period: None,
1608            send_as:           None,
1609            quick_reply_shortcut: None,
1610            effect:            None,
1611            video_timestamp:   None,
1612            allow_paid_stars:  None,
1613            allow_paid_floodskip: false,
1614            suggested_post:    None,
1615        };
1616        self.rpc_write(&req).await
1617    }
1618
1619    /// Delete messages by ID.
1620    pub async fn delete_messages(&self, message_ids: Vec<i32>, revoke: bool) -> Result<(), InvocationError> {
1621        let req = tl::functions::messages::DeleteMessages { revoke, id: message_ids };
1622        self.rpc_write(&req).await
1623    }
1624
1625    /// Get messages by their IDs from a peer.
1626    pub async fn get_messages_by_id(
1627        &self,
1628        peer: tl::enums::Peer,
1629        ids:  &[i32],
1630    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1631        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1632        let id_list: Vec<tl::enums::InputMessage> = ids.iter()
1633            .map(|&id| tl::enums::InputMessage::Id(tl::types::InputMessageId { id }))
1634            .collect();
1635        let req  = tl::functions::channels::GetMessages {
1636            channel: match &input_peer {
1637                tl::enums::InputPeer::Channel(c) =>
1638                    tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
1639                        channel_id: c.channel_id, access_hash: c.access_hash
1640                    }),
1641                _ => return self.get_messages_user(input_peer, id_list).await,
1642            },
1643            id: id_list,
1644        };
1645        let body    = self.rpc_call_raw(&req).await?;
1646        let mut cur = Cursor::from_slice(&body);
1647        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1648            tl::enums::messages::Messages::Messages(m) => m.messages,
1649            tl::enums::messages::Messages::Slice(m)    => m.messages,
1650            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1651            tl::enums::messages::Messages::NotModified(_) => vec![],
1652        };
1653        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1654    }
1655
1656    async fn get_messages_user(
1657        &self,
1658        _peer: tl::enums::InputPeer,
1659        ids:   Vec<tl::enums::InputMessage>,
1660    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1661        let req = tl::functions::messages::GetMessages { id: ids };
1662        let body    = self.rpc_call_raw(&req).await?;
1663        let mut cur = Cursor::from_slice(&body);
1664        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1665            tl::enums::messages::Messages::Messages(m) => m.messages,
1666            tl::enums::messages::Messages::Slice(m)    => m.messages,
1667            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1668            tl::enums::messages::Messages::NotModified(_) => vec![],
1669        };
1670        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1671    }
1672
1673    /// Get the pinned message in a chat.
1674    pub async fn get_pinned_message(
1675        &self,
1676        peer: tl::enums::Peer,
1677    ) -> Result<Option<update::IncomingMessage>, InvocationError> {
1678        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1679        let req = tl::functions::messages::Search {
1680            peer: input_peer,
1681            q: String::new(),
1682            from_id: None,
1683            saved_peer_id: None,
1684            saved_reaction: None,
1685            top_msg_id: None,
1686            filter: tl::enums::MessagesFilter::InputMessagesFilterPinned,
1687            min_date: 0,
1688            max_date: 0,
1689            offset_id: 0,
1690            add_offset: 0,
1691            limit: 1,
1692            max_id: 0,
1693            min_id: 0,
1694            hash: 0,
1695        };
1696        let body    = self.rpc_call_raw(&req).await?;
1697        let mut cur = Cursor::from_slice(&body);
1698        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1699            tl::enums::messages::Messages::Messages(m) => m.messages,
1700            tl::enums::messages::Messages::Slice(m)    => m.messages,
1701            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1702            tl::enums::messages::Messages::NotModified(_) => vec![],
1703        };
1704        Ok(msgs.into_iter().next().map(update::IncomingMessage::from_raw))
1705    }
1706
1707    /// Pin a message in a chat.
1708    pub async fn pin_message(
1709        &self,
1710        peer:       tl::enums::Peer,
1711        message_id: i32,
1712        silent:     bool,
1713        unpin:      bool,
1714        pm_oneside: bool,
1715    ) -> Result<(), InvocationError> {
1716        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1717        let req = tl::functions::messages::UpdatePinnedMessage {
1718            silent,
1719            unpin,
1720            pm_oneside,
1721            peer: input_peer,
1722            id:   message_id,
1723        };
1724        self.rpc_write(&req).await
1725    }
1726
1727    /// Unpin a specific message.
1728    pub async fn unpin_message(
1729        &self,
1730        peer:       tl::enums::Peer,
1731        message_id: i32,
1732    ) -> Result<(), InvocationError> {
1733        self.pin_message(peer, message_id, true, true, false).await
1734    }
1735
1736    /// Unpin all messages in a chat.
1737    pub async fn unpin_all_messages(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1738        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1739        let req = tl::functions::messages::UnpinAllMessages {
1740            peer:      input_peer,
1741            top_msg_id: None,
1742            saved_peer_id: None,
1743        };
1744        self.rpc_write(&req).await
1745    }
1746
1747    // ── Message search ─────────────────────────────────────────────────────
1748
1749    /// Search messages in a chat.
1750    pub async fn search_messages(
1751        &self,
1752        peer:  tl::enums::Peer,
1753        query: &str,
1754        limit: i32,
1755    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1756        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1757        let req = tl::functions::messages::Search {
1758            peer:         input_peer,
1759            q:            query.to_string(),
1760            from_id:      None,
1761            saved_peer_id: None,
1762            saved_reaction: None,
1763            top_msg_id:   None,
1764            filter:       tl::enums::MessagesFilter::InputMessagesFilterEmpty,
1765            min_date:     0,
1766            max_date:     0,
1767            offset_id:    0,
1768            add_offset:   0,
1769            limit,
1770            max_id:       0,
1771            min_id:       0,
1772            hash:         0,
1773        };
1774        let body    = self.rpc_call_raw(&req).await?;
1775        let mut cur = Cursor::from_slice(&body);
1776        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1777            tl::enums::messages::Messages::Messages(m) => m.messages,
1778            tl::enums::messages::Messages::Slice(m)    => m.messages,
1779            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1780            tl::enums::messages::Messages::NotModified(_) => vec![],
1781        };
1782        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1783    }
1784
1785    /// Search messages globally across all chats.
1786    pub async fn search_global(
1787        &self,
1788        query: &str,
1789        limit: i32,
1790    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1791        let req = tl::functions::messages::SearchGlobal {
1792            broadcasts_only: false,
1793            groups_only:     false,
1794            users_only:      false,
1795            folder_id:       None,
1796            q:               query.to_string(),
1797            filter:          tl::enums::MessagesFilter::InputMessagesFilterEmpty,
1798            min_date:        0,
1799            max_date:        0,
1800            offset_rate:     0,
1801            offset_peer:     tl::enums::InputPeer::Empty,
1802            offset_id:       0,
1803            limit,
1804        };
1805        let body    = self.rpc_call_raw(&req).await?;
1806        let mut cur = Cursor::from_slice(&body);
1807        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1808            tl::enums::messages::Messages::Messages(m) => m.messages,
1809            tl::enums::messages::Messages::Slice(m)    => m.messages,
1810            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1811            tl::enums::messages::Messages::NotModified(_) => vec![],
1812        };
1813        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1814    }
1815
1816    // ── Scheduled messages ─────────────────────────────────────────────────
1817
1818    /// Retrieve all scheduled messages in a chat.
1819    ///
1820    /// Scheduled messages are messages set to be sent at a future time using
1821    /// [`InputMessage::schedule_date`].  Returns them newest-first.
1822    ///
1823    /// # Example
1824    /// ```rust,no_run
1825    /// # async fn f(client: layer_client::Client, peer: layer_tl_types::enums::Peer) -> Result<(), Box<dyn std::error::Error>> {
1826    /// let scheduled = client.get_scheduled_messages(peer).await?;
1827    /// for msg in &scheduled {
1828    ///     println!("Scheduled: {:?} at {:?}", msg.text(), msg.date());
1829    /// }
1830    /// # Ok(()) }
1831    /// ```
1832    pub async fn get_scheduled_messages(
1833        &self,
1834        peer: tl::enums::Peer,
1835    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1836        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1837        let req = tl::functions::messages::GetScheduledHistory {
1838            peer: input_peer,
1839            hash: 0,
1840        };
1841        let body    = self.rpc_call_raw(&req).await?;
1842        let mut cur = Cursor::from_slice(&body);
1843        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1844            tl::enums::messages::Messages::Messages(m)        => m.messages,
1845            tl::enums::messages::Messages::Slice(m)           => m.messages,
1846            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1847            tl::enums::messages::Messages::NotModified(_)     => vec![],
1848        };
1849        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1850    }
1851
1852    /// Delete one or more scheduled messages by their IDs.
1853    pub async fn delete_scheduled_messages(
1854        &self,
1855        peer: tl::enums::Peer,
1856        ids:  Vec<i32>,
1857    ) -> Result<(), InvocationError> {
1858        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1859        let req = tl::functions::messages::DeleteScheduledMessages {
1860            peer: input_peer,
1861            id:   ids,
1862        };
1863        self.rpc_write(&req).await
1864    }
1865
1866    // ── Callback / Inline Queries ──────────────────────────────────────────
1867
1868    pub async fn answer_callback_query(
1869        &self,
1870        query_id: i64,
1871        text:     Option<&str>,
1872        alert:    bool,
1873    ) -> Result<bool, InvocationError> {
1874        let req = tl::functions::messages::SetBotCallbackAnswer {
1875            alert,
1876            query_id,
1877            message:    text.map(|s| s.to_string()),
1878            url:        None,
1879            cache_time: 0,
1880        };
1881        let body = self.rpc_call_raw(&req).await?;
1882        Ok(!body.is_empty())
1883    }
1884
1885    pub async fn answer_inline_query(
1886        &self,
1887        query_id:    i64,
1888        results:     Vec<tl::enums::InputBotInlineResult>,
1889        cache_time:  i32,
1890        is_personal: bool,
1891        next_offset: Option<String>,
1892    ) -> Result<bool, InvocationError> {
1893        let req = tl::functions::messages::SetInlineBotResults {
1894            gallery:        false,
1895            private:        is_personal,
1896            query_id,
1897            results,
1898            cache_time,
1899            next_offset,
1900            switch_pm:      None,
1901            switch_webview: None,
1902        };
1903        let body = self.rpc_call_raw(&req).await?;
1904        Ok(!body.is_empty())
1905    }
1906
1907    // ── Dialogs ────────────────────────────────────────────────────────────
1908
1909    /// Fetch up to `limit` dialogs, most recent first. Populates entity/message.
1910    pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
1911        let req = tl::functions::messages::GetDialogs {
1912            exclude_pinned: false,
1913            folder_id:      None,
1914            offset_date:    0,
1915            offset_id:      0,
1916            offset_peer:    tl::enums::InputPeer::Empty,
1917            limit,
1918            hash:           0,
1919        };
1920
1921        let body    = self.rpc_call_raw(&req).await?;
1922        let mut cur = Cursor::from_slice(&body);
1923        let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
1924            tl::enums::messages::Dialogs::Dialogs(d) => d,
1925            tl::enums::messages::Dialogs::Slice(d)   => tl::types::messages::Dialogs {
1926                dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
1927            },
1928            tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
1929        };
1930
1931        // Build message map
1932        let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
1933            .filter_map(|m| {
1934                let id = match &m {
1935                    tl::enums::Message::Message(x) => x.id,
1936                    tl::enums::Message::Service(x) => x.id,
1937                    tl::enums::Message::Empty(x)   => x.id,
1938                };
1939                Some((id, m))
1940            })
1941            .collect();
1942
1943        // Build user map
1944        let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
1945            .filter_map(|u| {
1946                if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
1947            })
1948            .collect();
1949
1950        // Build chat map
1951        let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
1952            .filter_map(|c| {
1953                let id = match &c {
1954                    tl::enums::Chat::Chat(x)             => x.id,
1955                    tl::enums::Chat::Forbidden(x)    => x.id,
1956                    tl::enums::Chat::Channel(x)          => x.id,
1957                    tl::enums::Chat::ChannelForbidden(x) => x.id,
1958                    tl::enums::Chat::Empty(x)            => x.id,
1959                };
1960                Some((id, c))
1961            })
1962            .collect();
1963
1964        // Cache peers for future access_hash lookups
1965        {
1966            let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
1967            let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
1968            self.cache_users_slice(&u_list).await;
1969            self.cache_chats_slice(&c_list).await;
1970        }
1971
1972        let result = raw.dialogs.into_iter().map(|d| {
1973            let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
1974            let peer   = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
1975
1976            let message = msg_map.get(&top_id).cloned();
1977            let entity = peer.and_then(|p| match p {
1978                tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
1979                _ => None,
1980            });
1981            let chat = peer.and_then(|p| match p {
1982                tl::enums::Peer::Chat(c)    => chat_map.get(&c.chat_id).cloned(),
1983                tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
1984                _ => None,
1985            });
1986
1987            Dialog { raw: d, message, entity, chat }
1988        }).collect();
1989
1990        Ok(result)
1991    }
1992
1993    /// Internal helper: fetch dialogs with a custom GetDialogs request.
1994    #[allow(dead_code)]
1995    async fn get_dialogs_raw(
1996        &self,
1997        req: tl::functions::messages::GetDialogs,
1998    ) -> Result<Vec<Dialog>, InvocationError> {
1999        let body    = self.rpc_call_raw(&req).await?;
2000        let mut cur = Cursor::from_slice(&body);
2001        let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2002            tl::enums::messages::Dialogs::Dialogs(d) => d,
2003            tl::enums::messages::Dialogs::Slice(d)   => tl::types::messages::Dialogs {
2004                dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2005            },
2006            tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
2007        };
2008
2009        let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2010            .filter_map(|m| {
2011                let id = match &m {
2012                    tl::enums::Message::Message(x) => x.id,
2013                    tl::enums::Message::Service(x) => x.id,
2014                    tl::enums::Message::Empty(x)   => x.id,
2015                };
2016                Some((id, m))
2017            })
2018            .collect();
2019
2020        let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2021            .filter_map(|u| {
2022                if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2023            })
2024            .collect();
2025
2026        let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2027            .filter_map(|c| {
2028                let id = match &c {
2029                    tl::enums::Chat::Chat(x)             => x.id,
2030                    tl::enums::Chat::Forbidden(x)    => x.id,
2031                    tl::enums::Chat::Channel(x)          => x.id,
2032                    tl::enums::Chat::ChannelForbidden(x) => x.id,
2033                    tl::enums::Chat::Empty(x)            => x.id,
2034                };
2035                Some((id, c))
2036            })
2037            .collect();
2038
2039        {
2040            let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2041            let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2042            self.cache_users_slice(&u_list).await;
2043            self.cache_chats_slice(&c_list).await;
2044        }
2045
2046        let result = raw.dialogs.into_iter().map(|d| {
2047            let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2048            let peer   = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2049
2050            let message = msg_map.get(&top_id).cloned();
2051            let entity = peer.and_then(|p| match p {
2052                tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2053                _ => None,
2054            });
2055            let chat = peer.and_then(|p| match p {
2056                tl::enums::Peer::Chat(c)    => chat_map.get(&c.chat_id).cloned(),
2057                tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2058                _ => None,
2059            });
2060
2061            Dialog { raw: d, message, entity, chat }
2062        }).collect();
2063
2064        Ok(result)
2065    }
2066
2067    /// Like `get_dialogs_raw` but also returns the total count from `messages.DialogsSlice`.
2068    async fn get_dialogs_raw_with_count(
2069        &self,
2070        req: tl::functions::messages::GetDialogs,
2071    ) -> Result<(Vec<Dialog>, Option<i32>), InvocationError> {
2072        let body    = self.rpc_call_raw(&req).await?;
2073        let mut cur = Cursor::from_slice(&body);
2074        let (raw, count) = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2075            tl::enums::messages::Dialogs::Dialogs(d) => (d, None),
2076            tl::enums::messages::Dialogs::Slice(d)   => {
2077                let cnt = Some(d.count);
2078                (tl::types::messages::Dialogs {
2079                    dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2080                }, cnt)
2081            }
2082            tl::enums::messages::Dialogs::NotModified(_) => return Ok((vec![], None)),
2083        };
2084
2085        let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2086            .filter_map(|m| {
2087                let id = match &m {
2088                    tl::enums::Message::Message(x) => x.id,
2089                    tl::enums::Message::Service(x) => x.id,
2090                    tl::enums::Message::Empty(x)   => x.id,
2091                };
2092                Some((id, m))
2093            }).collect();
2094
2095        let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2096            .filter_map(|u| {
2097                if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2098            }).collect();
2099
2100        let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2101            .filter_map(|c| {
2102                let id = match &c {
2103                    tl::enums::Chat::Chat(x)             => x.id,
2104                    tl::enums::Chat::Forbidden(x)        => x.id,
2105                    tl::enums::Chat::Channel(x)          => x.id,
2106                    tl::enums::Chat::ChannelForbidden(x) => x.id,
2107                    tl::enums::Chat::Empty(x)            => x.id,
2108                };
2109                Some((id, c))
2110            }).collect();
2111
2112        {
2113            let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2114            let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2115            self.cache_users_slice(&u_list).await;
2116            self.cache_chats_slice(&c_list).await;
2117        }
2118
2119        let result = raw.dialogs.into_iter().map(|d| {
2120            let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2121            let peer   = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2122            let message = msg_map.get(&top_id).cloned();
2123            let entity = peer.and_then(|p| match p {
2124                tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2125                _ => None,
2126            });
2127            let chat = peer.and_then(|p| match p {
2128                tl::enums::Peer::Chat(c)    => chat_map.get(&c.chat_id).cloned(),
2129                tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2130                _ => None,
2131            });
2132            Dialog { raw: d, message, entity, chat }
2133        }).collect();
2134
2135        Ok((result, count))
2136    }
2137
2138    /// Like `get_messages` but also returns the total count from `messages.Slice`.
2139    async fn get_messages_with_count(
2140        &self,
2141        peer:      tl::enums::InputPeer,
2142        limit:     i32,
2143        offset_id: i32,
2144    ) -> Result<(Vec<update::IncomingMessage>, Option<i32>), InvocationError> {
2145        let req = tl::functions::messages::GetHistory {
2146            peer, offset_id, offset_date: 0, add_offset: 0,
2147            limit, max_id: 0, min_id: 0, hash: 0,
2148        };
2149        let body    = self.rpc_call_raw(&req).await?;
2150        let mut cur = Cursor::from_slice(&body);
2151        let (msgs, count) = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2152            tl::enums::messages::Messages::Messages(m) => (m.messages, None),
2153            tl::enums::messages::Messages::Slice(m)    => {
2154                let cnt = Some(m.count);
2155                (m.messages, cnt)
2156            }
2157            tl::enums::messages::Messages::ChannelMessages(m) => (m.messages, Some(m.count)),
2158            tl::enums::messages::Messages::NotModified(_) => (vec![], None),
2159        };
2160        Ok((msgs.into_iter().map(update::IncomingMessage::from_raw).collect(), count))
2161    }
2162
2163    /// Download all bytes of a media attachment and save them to `path`.
2164    ///
2165    /// # Example
2166    /// ```rust,no_run
2167    /// # async fn f(client: layer_client::Client, msg: layer_client::update::IncomingMessage) -> Result<(), Box<dyn std::error::Error>> {
2168    /// if let Some(loc) = msg.download_location() {
2169    ///     client.download_media_to_file(loc, "/tmp/file.jpg").await?;
2170    /// }
2171    /// # Ok(()) }
2172    /// ```
2173    pub async fn download_media_to_file(
2174        &self,
2175        location: tl::enums::InputFileLocation,
2176        path:     impl AsRef<std::path::Path>,
2177    ) -> Result<(), InvocationError> {
2178        use tokio::io::AsyncWriteExt as _;
2179        let bytes = self.download_media(location).await?;
2180        let mut f = tokio::fs::File::create(path).await?;
2181        f.write_all(&bytes).await?;
2182        Ok(())
2183    }
2184
2185    pub async fn delete_dialog(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2186        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2187        let req = tl::functions::messages::DeleteHistory {
2188            just_clear:  false,
2189            revoke:      false,
2190            peer:        input_peer,
2191            max_id:      0,
2192            min_date:    None,
2193            max_date:    None,
2194        };
2195        self.rpc_write(&req).await
2196    }
2197
2198    /// Mark all messages in a chat as read.
2199    pub async fn mark_as_read(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2200        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2201        match &input_peer {
2202            tl::enums::InputPeer::Channel(c) => {
2203                let req = tl::functions::channels::ReadHistory {
2204                    channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2205                        channel_id: c.channel_id, access_hash: c.access_hash,
2206                    }),
2207                    max_id: 0,
2208                };
2209                self.rpc_call_raw(&req).await?;
2210            }
2211            _ => {
2212                let req = tl::functions::messages::ReadHistory { peer: input_peer, max_id: 0 };
2213                self.rpc_call_raw(&req).await?;
2214            }
2215        }
2216        Ok(())
2217    }
2218
2219    /// Clear unread mention markers.
2220    pub async fn clear_mentions(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2221        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2222        let req = tl::functions::messages::ReadMentions { peer: input_peer, top_msg_id: None };
2223        self.rpc_write(&req).await
2224    }
2225
2226    // ── Chat actions (typing, etc) ─────────────────────────────────────────
2227
2228    /// Send a chat action (typing indicator, uploading photo, etc).
2229    ///
2230    /// For "typing" use `tl::enums::SendMessageAction::Typing`.
2231    pub async fn send_chat_action(
2232        &self,
2233        peer:   tl::enums::Peer,
2234        action: tl::enums::SendMessageAction,
2235    ) -> Result<(), InvocationError> {
2236        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2237        let req = tl::functions::messages::SetTyping {
2238            peer: input_peer,
2239            top_msg_id: None,
2240            action,
2241        };
2242        self.rpc_write(&req).await
2243    }
2244
2245    // ── Join / invite links ────────────────────────────────────────────────
2246
2247    /// Join a public chat or channel by username/peer.
2248    pub async fn join_chat(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2249        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2250        match input_peer {
2251            tl::enums::InputPeer::Channel(c) => {
2252                let req = tl::functions::channels::JoinChannel {
2253                    channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2254                        channel_id: c.channel_id, access_hash: c.access_hash,
2255                    }),
2256                };
2257                self.rpc_call_raw(&req).await?;
2258            }
2259            tl::enums::InputPeer::Chat(c) => {
2260                let req = tl::functions::messages::AddChatUser {
2261                    chat_id:  c.chat_id,
2262                    user_id:  tl::enums::InputUser::UserSelf,
2263                    fwd_limit: 0,
2264                };
2265                self.rpc_call_raw(&req).await?;
2266            }
2267            _ => return Err(InvocationError::Deserialize("cannot join this peer type".into())),
2268        }
2269        Ok(())
2270    }
2271
2272    /// Accept and join via an invite link.
2273    pub async fn accept_invite_link(&self, link: &str) -> Result<(), InvocationError> {
2274        let hash = Self::parse_invite_hash(link)
2275            .ok_or_else(|| InvocationError::Deserialize(format!("invalid invite link: {link}")))?;
2276        let req = tl::functions::messages::ImportChatInvite { hash: hash.to_string() };
2277        self.rpc_write(&req).await
2278    }
2279
2280    /// Extract hash from `https://t.me/+HASH` or `https://t.me/joinchat/HASH`.
2281    pub fn parse_invite_hash(link: &str) -> Option<&str> {
2282        if let Some(pos) = link.find("/+") {
2283            return Some(&link[pos + 2..]);
2284        }
2285        if let Some(pos) = link.find("/joinchat/") {
2286            return Some(&link[pos + 10..]);
2287        }
2288        None
2289    }
2290
2291    // ── Message history (paginated) ────────────────────────────────────────
2292
2293    /// Fetch a page of messages from a peer's history.
2294    pub async fn get_messages(
2295        &self,
2296        peer:      tl::enums::InputPeer,
2297        limit:     i32,
2298        offset_id: i32,
2299    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2300        let req = tl::functions::messages::GetHistory {
2301            peer, offset_id, offset_date: 0, add_offset: 0,
2302            limit, max_id: 0, min_id: 0, hash: 0,
2303        };
2304        let body    = self.rpc_call_raw(&req).await?;
2305        let mut cur = Cursor::from_slice(&body);
2306        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2307            tl::enums::messages::Messages::Messages(m) => m.messages,
2308            tl::enums::messages::Messages::Slice(m)    => m.messages,
2309            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2310            tl::enums::messages::Messages::NotModified(_) => vec![],
2311        };
2312        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2313    }
2314
2315    // ── Peer resolution ────────────────────────────────────────────────────
2316
2317    /// Resolve a peer string to a [`tl::enums::Peer`].
2318    pub async fn resolve_peer(
2319        &self,
2320        peer: &str,
2321    ) -> Result<tl::enums::Peer, InvocationError> {
2322        match peer.trim() {
2323            "me" | "self" => Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 })),
2324            username if username.starts_with('@') => {
2325                self.resolve_username(&username[1..]).await
2326            }
2327            id_str => {
2328                if let Ok(id) = id_str.parse::<i64>() {
2329                    Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: id }))
2330                } else {
2331                    Err(InvocationError::Deserialize(format!("cannot resolve peer: {peer}")))
2332                }
2333            }
2334        }
2335    }
2336
2337    async fn resolve_username(&self, username: &str) -> Result<tl::enums::Peer, InvocationError> {
2338        let req  = tl::functions::contacts::ResolveUsername {
2339            username: username.to_string(), referer: None,
2340        };
2341        let body = self.rpc_call_raw(&req).await?;
2342        let mut cur = Cursor::from_slice(&body);
2343        let resolved = match tl::enums::contacts::ResolvedPeer::deserialize(&mut cur)? {
2344            tl::enums::contacts::ResolvedPeer::ResolvedPeer(r) => r,
2345        };
2346        // Cache users and chats from the resolution
2347        self.cache_users_slice(&resolved.users).await;
2348        self.cache_chats_slice(&resolved.chats).await;
2349        Ok(resolved.peer)
2350    }
2351
2352    // ── Raw invoke ─────────────────────────────────────────────────────────
2353
2354    /// Invoke any TL function directly, handling flood-wait retries.
2355    pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
2356        let body = self.rpc_call_raw(req).await?;
2357        let mut cur = Cursor::from_slice(&body);
2358        R::Return::deserialize(&mut cur).map_err(Into::into)
2359    }
2360
2361    async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
2362        let mut fail_count   = NonZeroU32::new(1).unwrap();
2363        let mut slept_so_far = Duration::default();
2364        loop {
2365            match self.do_rpc_call(req).await {
2366                Ok(body) => return Ok(body),
2367                Err(e) => {
2368                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
2369                    match self.inner.retry_policy.should_retry(&ctx) {
2370                        ControlFlow::Continue(delay) => {
2371                            sleep(delay).await;
2372                            slept_so_far += delay;
2373                            fail_count = fail_count.saturating_add(1);
2374                        }
2375                        ControlFlow::Break(()) => return Err(ctx.error),
2376                    }
2377                }
2378            }
2379        }
2380    }
2381
2382    /// Send an RPC call and await the response via a oneshot channel.
2383    ///
2384    /// This is the core of the split-stream design:
2385    ///  1. Pack the request and get its msg_id.
2386    ///  2. Register a oneshot Sender in the pending map (BEFORE sending).
2387    ///  3. Send the frame while holding the writer lock.
2388    ///  4. Release the writer lock immediately — the reader task now runs freely.
2389    ///  5. Await the oneshot Receiver; the reader task will fulfill it when
2390    ///     the matching rpc_result frame arrives.
2391    async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
2392        let (tx, rx) = oneshot::channel();
2393        {
2394            let mut w = self.inner.writer.lock().await;
2395            let (wire, msg_id) = w.enc.pack_with_msg_id(req);
2396            let fk = w.frame_kind.clone();
2397            // Insert BEFORE sending — response cannot arrive before we send, but
2398            // inserting first is the safe contract.
2399            self.inner.pending.lock().await.insert(msg_id, tx);
2400            send_frame_write(&mut w.write_half, &wire, &fk).await?;
2401        }
2402        // Writer lock is released. Reader task can now freely read from the socket
2403        // without competing with us.
2404        match tokio::time::timeout(Duration::from_secs(30), rx).await {
2405            Ok(Ok(result)) => result,
2406            Ok(Err(_))     => Err(InvocationError::Deserialize("RPC channel closed (reader died?)".into())),
2407            Err(_)         => Err(InvocationError::Deserialize("RPC timed out after 30 s".into())),
2408        }
2409    }
2410
2411    /// Like `rpc_call_raw` but for write RPCs (Serializable, return type is Updates).
2412    /// Uses the same oneshot mechanism — the reader task signals success/failure.
2413    async fn rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
2414        let mut fail_count   = NonZeroU32::new(1).unwrap();
2415        let mut slept_so_far = Duration::default();
2416        loop {
2417            let result = self.do_rpc_write(req).await;
2418            match result {
2419                Ok(()) => return Ok(()),
2420                Err(e) => {
2421                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
2422                    match self.inner.retry_policy.should_retry(&ctx) {
2423                        ControlFlow::Continue(delay) => {
2424                            sleep(delay).await;
2425                            slept_so_far += delay;
2426                            fail_count = fail_count.saturating_add(1);
2427                        }
2428                        ControlFlow::Break(()) => return Err(ctx.error),
2429                    }
2430                }
2431            }
2432        }
2433    }
2434
2435    async fn do_rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
2436        let (tx, rx) = oneshot::channel();
2437        {
2438            let mut w = self.inner.writer.lock().await;
2439            let (wire, msg_id) = w.enc.pack_serializable_with_msg_id(req);
2440            let fk = w.frame_kind.clone();
2441            self.inner.pending.lock().await.insert(msg_id, tx);
2442            send_frame_write(&mut w.write_half, &wire, &fk).await?;
2443        }
2444        match tokio::time::timeout(Duration::from_secs(30), rx).await {
2445            Ok(Ok(result)) => result.map(|_| ()),  // ignore body for write RPCs
2446            Ok(Err(_))     => Err(InvocationError::Deserialize("rpc_write channel closed".into())),
2447            Err(_)         => Err(InvocationError::Deserialize("rpc_write timed out after 30 s".into())),
2448        }
2449    }
2450
2451    // ── initConnection ─────────────────────────────────────────────────────
2452
2453    async fn init_connection(&self) -> Result<(), InvocationError> {
2454        use tl::functions::{InvokeWithLayer, InitConnection, help::GetConfig};
2455        let req = InvokeWithLayer {
2456            layer: tl::LAYER,
2457            query: InitConnection {
2458                api_id:           self.inner.api_id,
2459                device_model:     "Linux".to_string(),
2460                system_version:   "1.0".to_string(),
2461                app_version:      env!("CARGO_PKG_VERSION").to_string(),
2462                system_lang_code: "en".to_string(),
2463                lang_pack:        "".to_string(),
2464                lang_code:        "en".to_string(),
2465                proxy:            None,
2466                params:           None,
2467                query:            GetConfig {},
2468            },
2469        };
2470
2471        // Use the split-writer oneshot path (reader task routes the response).
2472        let body = self.rpc_call_raw_serializable(&req).await?;
2473
2474        let mut cur = Cursor::from_slice(&body);
2475        if let Ok(tl::enums::Config::Config(cfg)) = tl::enums::Config::deserialize(&mut cur) {
2476            let allow_ipv6 = self.inner.allow_ipv6;
2477            let mut opts = self.inner.dc_options.lock().await;
2478            for opt in &cfg.dc_options {
2479                let tl::enums::DcOption::DcOption(o) = opt;
2480                if o.media_only || o.cdn || o.tcpo_only { continue; }
2481                if o.ipv6 && !allow_ipv6 { continue; }
2482                let addr = format!("{}:{}", o.ip_address, o.port);
2483                let entry = opts.entry(o.id).or_insert_with(|| DcEntry {
2484                    dc_id: o.id, addr: addr.clone(),
2485                    auth_key: None, first_salt: 0, time_offset: 0,
2486                });
2487                entry.addr = addr;
2488            }
2489            log::info!("[layer] initConnection ✓  ({} DCs, ipv6={})", cfg.dc_options.len(), allow_ipv6);
2490        }
2491        Ok(())
2492    }
2493
2494    // ── DC migration ───────────────────────────────────────────────────────
2495
2496    async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
2497        let addr = {
2498            let opts = self.inner.dc_options.lock().await;
2499            opts.get(&new_dc_id).map(|e| e.addr.clone())
2500                .unwrap_or_else(|| "149.154.167.51:443".to_string())
2501        };
2502        log::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
2503
2504        let saved_key = {
2505            let opts = self.inner.dc_options.lock().await;
2506            opts.get(&new_dc_id).and_then(|e| e.auth_key)
2507        };
2508
2509        let socks5    = self.inner.socks5.clone();
2510        let transport = self.inner.transport.clone();
2511        let conn = if let Some(key) = saved_key {
2512            Connection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
2513        } else {
2514            Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
2515        };
2516
2517        let new_key = conn.auth_key_bytes();
2518        {
2519            let mut opts = self.inner.dc_options.lock().await;
2520            let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
2521                dc_id: new_dc_id, addr: addr.clone(),
2522                auth_key: None, first_salt: 0, time_offset: 0,
2523            });
2524            entry.auth_key = Some(new_key);
2525        }
2526
2527        // Split the new connection and replace writer + reader.
2528        let (new_writer, new_read, new_fk) = conn.into_writer();
2529        let new_ak  = new_writer.enc.auth_key_bytes();
2530        let new_sid = new_writer.enc.session_id();
2531        *self.inner.writer.lock().await = new_writer;
2532        *self.inner.home_dc_id.lock().await = new_dc_id;
2533
2534        // Hand the new read half to the reader task FIRST so it can route
2535        // the upcoming init_connection RPC response.
2536        let _ = self.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
2537
2538        // migrate_to() is called from user-facing methods (bot_sign_in,
2539        // request_login_code, sign_in) — NOT from inside the reader loop.
2540        // The reader task is a separate tokio task running concurrently, so
2541        // awaiting init_connection() here is safe: the reader is free to route
2542        // the RPC response while we wait. We must await before returning so
2543        // the caller can safely retry the original request on the new DC.
2544        //
2545        // Respect FLOOD_WAIT: if Telegram rate-limits init, wait and retry
2546        // rather than returning an error that would abort the whole auth flow.
2547        loop {
2548            match self.init_connection().await {
2549                Ok(()) => break,
2550                Err(InvocationError::Rpc(ref r))
2551                    if r.flood_wait_seconds().is_some() =>
2552                {
2553                    let secs = r.flood_wait_seconds().unwrap();
2554                    log::warn!(
2555                        "[layer] migrate_to DC{new_dc_id}: init FLOOD_WAIT_{secs} — waiting"
2556                    );
2557                    sleep(Duration::from_secs(secs + 1)).await;
2558                }
2559                Err(e) => return Err(e),
2560            }
2561        }
2562
2563        self.save_session().await.ok();
2564        log::info!("[layer] Now on DC{new_dc_id} ✓");
2565        Ok(())
2566    }
2567
2568    // ── Cache helpers ──────────────────────────────────────────────────────
2569
2570    async fn cache_user(&self, user: &tl::enums::User) {
2571        self.inner.peer_cache.lock().await.cache_user(user);
2572    }
2573
2574    async fn cache_users_slice(&self, users: &[tl::enums::User]) {
2575        let mut cache = self.inner.peer_cache.lock().await;
2576        cache.cache_users(users);
2577    }
2578
2579    async fn cache_chats_slice(&self, chats: &[tl::enums::Chat]) {
2580        let mut cache = self.inner.peer_cache.lock().await;
2581        cache.cache_chats(chats);
2582    }
2583
2584    // Public versions used by sub-modules (media.rs, participants.rs, pts.rs)
2585    #[doc(hidden)]
2586    pub async fn cache_users_slice_pub(&self, users: &[tl::enums::User]) {
2587        self.cache_users_slice(users).await;
2588    }
2589
2590    #[doc(hidden)]
2591    pub async fn cache_chats_slice_pub(&self, chats: &[tl::enums::Chat]) {
2592        self.cache_chats_slice(chats).await;
2593    }
2594
2595    /// Public RPC call for use by sub-modules.
2596    #[doc(hidden)]
2597    pub async fn rpc_call_raw_pub<R: layer_tl_types::RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
2598        self.rpc_call_raw(req).await
2599    }
2600
2601    /// Like rpc_call_raw but takes a Serializable (for InvokeWithLayer wrappers).
2602    async fn rpc_call_raw_serializable<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
2603        let mut fail_count   = NonZeroU32::new(1).unwrap();
2604        let mut slept_so_far = Duration::default();
2605        loop {
2606            match self.do_rpc_write_returning_body(req).await {
2607                Ok(body) => return Ok(body),
2608                Err(e) => {
2609                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
2610                    match self.inner.retry_policy.should_retry(&ctx) {
2611                        ControlFlow::Continue(delay) => {
2612                            sleep(delay).await;
2613                            slept_so_far += delay;
2614                            fail_count = fail_count.saturating_add(1);
2615                        }
2616                        ControlFlow::Break(()) => return Err(ctx.error),
2617                    }
2618                }
2619            }
2620        }
2621    }
2622
2623    async fn do_rpc_write_returning_body<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
2624        let (tx, rx) = oneshot::channel();
2625        {
2626            let mut w = self.inner.writer.lock().await;
2627            let (wire, msg_id) = w.enc.pack_serializable_with_msg_id(req);
2628            let fk = w.frame_kind.clone();
2629            self.inner.pending.lock().await.insert(msg_id, tx);
2630            send_frame_write(&mut w.write_half, &wire, &fk).await?;
2631        }
2632        match tokio::time::timeout(Duration::from_secs(30), rx).await {
2633            Ok(Ok(result)) => result,
2634            Ok(Err(_))     => Err(InvocationError::Deserialize("rpc channel closed".into())),
2635            Err(_)         => Err(InvocationError::Deserialize("rpc timed out after 30 s".into())),
2636        }
2637    }
2638
2639    // ── Paginated dialog iterator ──────────────────────────────────────────
2640
2641    /// Fetch dialogs page by page.
2642    ///
2643    /// Returns a [`DialogIter`] that can be advanced with [`DialogIter::next`].
2644    /// This lets you page through all dialogs without loading them all at once.
2645    ///
2646    /// # Example
2647    /// ```rust,no_run
2648    /// # async fn f(client: layer_client::Client) -> Result<(), Box<dyn std::error::Error>> {
2649    /// let mut iter = client.iter_dialogs();
2650    /// while let Some(dialog) = iter.next(&client).await? {
2651    ///     println!("{}", dialog.title());
2652    /// }
2653    /// # Ok(()) }
2654    /// ```
2655    pub fn iter_dialogs(&self) -> DialogIter {
2656        DialogIter {
2657            offset_date: 0,
2658            offset_id:   0,
2659            offset_peer: tl::enums::InputPeer::Empty,
2660            done:        false,
2661            buffer:      VecDeque::new(),
2662            total:       None,
2663        }
2664    }
2665
2666    /// Fetch messages from a peer, page by page.
2667    ///
2668    /// Returns a [`MessageIter`] that can be advanced with [`MessageIter::next`].
2669    ///
2670    /// # Example
2671    /// ```rust,no_run
2672    /// # async fn f(client: layer_client::Client, peer: layer_tl_types::enums::Peer) -> Result<(), Box<dyn std::error::Error>> {
2673    /// let mut iter = client.iter_messages(peer);
2674    /// while let Some(msg) = iter.next(&client).await? {
2675    ///     println!("{:?}", msg.text());
2676    /// }
2677    /// # Ok(()) }
2678    /// ```
2679    pub fn iter_messages(&self, peer: tl::enums::Peer) -> MessageIter {
2680        MessageIter {
2681            peer,
2682            offset_id: 0,
2683            done:      false,
2684            buffer:    VecDeque::new(),
2685            total:     None,
2686        }
2687    }
2688
2689    // ── resolve_peer helper returning Result on unknown hash ───────────────
2690
2691    /// Try to resolve a peer to InputPeer, returning an error if the access_hash
2692    /// is unknown (i.e. the peer has not been seen in any prior API call).
2693    pub async fn resolve_to_input_peer(
2694        &self,
2695        peer: &tl::enums::Peer,
2696    ) -> Result<tl::enums::InputPeer, InvocationError> {
2697        let cache = self.inner.peer_cache.lock().await;
2698        match peer {
2699            tl::enums::Peer::User(u) => {
2700                if u.user_id == 0 {
2701                    return Ok(tl::enums::InputPeer::PeerSelf);
2702                }
2703                match cache.users.get(&u.user_id) {
2704                    Some(&hash) => Ok(tl::enums::InputPeer::User(tl::types::InputPeerUser {
2705                        user_id: u.user_id, access_hash: hash,
2706                    })),
2707                    None => Err(InvocationError::Deserialize(format!(
2708                        "access_hash unknown for user {}; resolve via username first", u.user_id
2709                    ))),
2710                }
2711            }
2712            tl::enums::Peer::Chat(c) => {
2713                Ok(tl::enums::InputPeer::Chat(tl::types::InputPeerChat { chat_id: c.chat_id }))
2714            }
2715            tl::enums::Peer::Channel(c) => {
2716                match cache.channels.get(&c.channel_id) {
2717                    Some(&hash) => Ok(tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
2718                        channel_id: c.channel_id, access_hash: hash,
2719                    })),
2720                    None => Err(InvocationError::Deserialize(format!(
2721                        "access_hash unknown for channel {}; resolve via username first", c.channel_id
2722                    ))),
2723                }
2724            }
2725        }
2726    }
2727
2728    // ── Multi-DC pool ──────────────────────────────────────────────────────
2729
2730    /// Invoke a request on a specific DC, using the pool.
2731    ///
2732    /// If the target DC has no auth key yet, one is acquired via DH and then
2733    /// authorized via `auth.exportAuthorization` / `auth.importAuthorization`
2734    /// so the worker DC can serve user-account requests too.
2735    pub async fn invoke_on_dc<R: RemoteCall>(
2736        &self,
2737        dc_id: i32,
2738        req:   &R,
2739    ) -> Result<R::Return, InvocationError> {
2740        let body = self.rpc_on_dc_raw(dc_id, req).await?;
2741        let mut cur = Cursor::from_slice(&body);
2742        R::Return::deserialize(&mut cur).map_err(Into::into)
2743    }
2744
2745    /// Raw RPC call routed to `dc_id`, exporting auth if needed.
2746    async fn rpc_on_dc_raw<R: RemoteCall>(
2747        &self,
2748        dc_id: i32,
2749        req:   &R,
2750    ) -> Result<Vec<u8>, InvocationError> {
2751        // Check if we need to open a new connection for this DC
2752        let needs_new = {
2753            let pool = self.inner.dc_pool.lock().await;
2754            !pool.has_connection(dc_id)
2755        };
2756
2757        if needs_new {
2758            let addr = {
2759                let opts = self.inner.dc_options.lock().await;
2760                opts.get(&dc_id).map(|e| e.addr.clone())
2761                    .ok_or_else(|| InvocationError::Deserialize(format!("unknown DC{dc_id}")))?
2762            };
2763
2764            let socks5    = self.inner.socks5.clone();
2765            let transport = self.inner.transport.clone();
2766            let saved_key = {
2767                let opts = self.inner.dc_options.lock().await;
2768                opts.get(&dc_id).and_then(|e| e.auth_key)
2769            };
2770
2771            let dc_conn = if let Some(key) = saved_key {
2772                dc_pool::DcConnection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
2773            } else {
2774                let conn = dc_pool::DcConnection::connect_raw(&addr, socks5.as_ref(), &transport).await?;
2775                // Export auth from home DC and import into worker DC
2776                let home_dc_id = *self.inner.home_dc_id.lock().await;
2777                if dc_id != home_dc_id {
2778                    if let Err(e) = self.export_import_auth(dc_id, &conn).await {
2779                        log::warn!("[layer] Auth export/import for DC{dc_id} failed: {e}");
2780                    }
2781                }
2782                conn
2783            };
2784
2785            let key = dc_conn.auth_key_bytes();
2786            {
2787                let mut opts = self.inner.dc_options.lock().await;
2788                if let Some(e) = opts.get_mut(&dc_id) {
2789                    e.auth_key = Some(key);
2790                }
2791            }
2792            self.inner.dc_pool.lock().await.insert(dc_id, dc_conn);
2793        }
2794
2795        let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
2796        self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, req).await
2797    }
2798
2799    /// Export authorization from the home DC and import it into `dc_id`.
2800    async fn export_import_auth(
2801        &self,
2802        dc_id:   i32,
2803        _dc_conn: &dc_pool::DcConnection, // reserved for future direct import
2804    ) -> Result<(), InvocationError> {
2805        // Export from home DC
2806        let export_req = tl::functions::auth::ExportAuthorization { dc_id };
2807        let body    = self.rpc_call_raw(&export_req).await?;
2808        let mut cur = Cursor::from_slice(&body);
2809        let exported = match tl::enums::auth::ExportedAuthorization::deserialize(&mut cur)? {
2810            tl::enums::auth::ExportedAuthorization::ExportedAuthorization(e) => e,
2811        };
2812
2813        // Import into the target DC via the pool
2814        let import_req = tl::functions::auth::ImportAuthorization {
2815            id:    exported.id,
2816            bytes: exported.bytes,
2817        };
2818        let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
2819        self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, &import_req).await?;
2820        log::info!("[layer] Auth exported+imported to DC{dc_id} ✓");
2821        Ok(())
2822    }
2823
2824    // ── Private helpers ────────────────────────────────────────────────────
2825
2826    async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
2827        let body    = self.rpc_call_raw(&tl::functions::account::GetPassword {}).await?;
2828        let mut cur = Cursor::from_slice(&body);
2829        let pw = match tl::enums::account::Password::deserialize(&mut cur)? {
2830            tl::enums::account::Password::Password(p) => p,
2831        };
2832        Ok(PasswordToken { password: pw })
2833    }
2834
2835    fn make_send_code_req(&self, phone: &str) -> tl::functions::auth::SendCode {
2836        tl::functions::auth::SendCode {
2837            phone_number: phone.to_string(),
2838            api_id:       self.inner.api_id,
2839            api_hash:     self.inner.api_hash.clone(),
2840            settings:     tl::enums::CodeSettings::CodeSettings(
2841                tl::types::CodeSettings {
2842                    allow_flashcall: false, current_number: false, allow_app_hash: false,
2843                    allow_missed_call: false, allow_firebase: false, unknown_number: false,
2844                    logout_tokens: None, token: None, app_sandbox: None,
2845                },
2846            ),
2847        }
2848    }
2849
2850    fn extract_user_name(user: &tl::enums::User) -> String {
2851        match user {
2852            tl::enums::User::User(u) => {
2853                format!("{} {}",
2854                    u.first_name.as_deref().unwrap_or(""),
2855                    u.last_name.as_deref().unwrap_or(""))
2856                    .trim().to_string()
2857            }
2858            tl::enums::User::Empty(_) => "(unknown)".into(),
2859        }
2860    }
2861
2862    fn extract_password_params(
2863        algo: &tl::enums::PasswordKdfAlgo,
2864    ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
2865        match algo {
2866            tl::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
2867                Ok((&a.salt1, &a.salt2, &a.p, a.g))
2868            }
2869            _ => Err(InvocationError::Deserialize("unsupported password KDF algo".into())),
2870        }
2871    }
2872}
2873
2874// ─── Paginated iterators ──────────────────────────────────────────────────────
2875
2876/// Cursor-based iterator over dialogs. Created by [`Client::iter_dialogs`].
2877pub struct DialogIter {
2878    offset_date: i32,
2879    offset_id:   i32,
2880    offset_peer: tl::enums::InputPeer,
2881    done:        bool,
2882    buffer:      VecDeque<Dialog>,
2883    /// Total dialog count as reported by the first server response.
2884    /// `None` until the first page is fetched.
2885    pub total: Option<i32>,
2886}
2887
2888impl DialogIter {
2889    const PAGE_SIZE: i32 = 100;
2890
2891    /// Total number of dialogs as reported by the server on the first page fetch.
2892    ///
2893    /// Returns `None` before the first [`next`](Self::next) call, and `None` for
2894    /// accounts with fewer dialogs than `PAGE_SIZE` (where the server returns
2895    /// `messages.Dialogs` instead of `messages.DialogsSlice`).
2896    pub fn total(&self) -> Option<i32> { self.total }
2897
2898    /// Fetch the next dialog. Returns `None` when all dialogs have been yielded.
2899    pub async fn next(&mut self, client: &Client) -> Result<Option<Dialog>, InvocationError> {
2900        if let Some(d) = self.buffer.pop_front() { return Ok(Some(d)); }
2901        if self.done { return Ok(None); }
2902
2903        let req = tl::functions::messages::GetDialogs {
2904            exclude_pinned: false,
2905            folder_id:      None,
2906            offset_date:    self.offset_date,
2907            offset_id:      self.offset_id,
2908            offset_peer:    self.offset_peer.clone(),
2909            limit:          Self::PAGE_SIZE,
2910            hash:           0,
2911        };
2912
2913        let (dialogs, count) = client.get_dialogs_raw_with_count(req).await?;
2914        // Populate total from the first response (messages.DialogsSlice carries a count).
2915        if self.total.is_none() {
2916            self.total = count;
2917        }
2918        if dialogs.is_empty() || dialogs.len() < Self::PAGE_SIZE as usize {
2919            self.done = true;
2920        }
2921
2922        // Prepare cursor for next page
2923        if let Some(last) = dialogs.last() {
2924            self.offset_date = last.message.as_ref().map(|m| match m {
2925                tl::enums::Message::Message(x) => x.date,
2926                tl::enums::Message::Service(x) => x.date,
2927                _ => 0,
2928            }).unwrap_or(0);
2929            self.offset_id = last.top_message();
2930            if let Some(peer) = last.peer() {
2931                self.offset_peer = client.inner.peer_cache.lock().await.peer_to_input(peer);
2932            }
2933        }
2934
2935        self.buffer.extend(dialogs);
2936        Ok(self.buffer.pop_front())
2937    }
2938}
2939
2940/// Cursor-based iterator over message history. Created by [`Client::iter_messages`].
2941pub struct MessageIter {
2942    peer:      tl::enums::Peer,
2943    offset_id: i32,
2944    done:      bool,
2945    buffer:    VecDeque<update::IncomingMessage>,
2946    /// Total message count from the first server response (messages.Slice).
2947    /// `None` until the first page is fetched, `None` for `messages.Messages`
2948    /// (which returns an exact slice with no separate count).
2949    pub total: Option<i32>,
2950}
2951
2952impl MessageIter {
2953    const PAGE_SIZE: i32 = 100;
2954
2955    /// Total message count from the first server response.
2956    ///
2957    /// Returns `None` before the first [`next`](Self::next) call, or for chats
2958    /// where the server returns an exact (non-slice) response.
2959    pub fn total(&self) -> Option<i32> { self.total }
2960
2961    /// Fetch the next message (newest first). Returns `None` when all messages have been yielded.
2962    pub async fn next(&mut self, client: &Client) -> Result<Option<update::IncomingMessage>, InvocationError> {
2963        if let Some(m) = self.buffer.pop_front() { return Ok(Some(m)); }
2964        if self.done { return Ok(None); }
2965
2966        let input_peer = client.inner.peer_cache.lock().await.peer_to_input(&self.peer);
2967        let (page, count) = client.get_messages_with_count(input_peer, Self::PAGE_SIZE, self.offset_id).await?;
2968
2969        if self.total.is_none() { self.total = count; }
2970
2971        if page.is_empty() || page.len() < Self::PAGE_SIZE as usize {
2972            self.done = true;
2973        }
2974        if let Some(last) = page.last() {
2975            self.offset_id = last.id();
2976        }
2977
2978        self.buffer.extend(page);
2979        Ok(self.buffer.pop_front())
2980    }
2981}
2982
2983// ─── Public random helper (used by media.rs) ──────────────────────────────────
2984
2985/// Public wrapper for `random_i64` used by sub-modules.
2986#[doc(hidden)]
2987pub fn random_i64_pub() -> i64 { random_i64() }
2988
2989// ─── Connection ───────────────────────────────────────────────────────────────
2990
2991/// How framing bytes are sent/received on a connection.
2992#[derive(Clone)]
2993enum FrameKind {
2994    Abridged,
2995    Intermediate,
2996    #[allow(dead_code)]
2997    Full { send_seqno: u32, recv_seqno: u32 },
2998}
2999
3000
3001// ─── Split connection types ───────────────────────────────────────────────────
3002
3003/// Write half of a split connection.  Held under `Mutex` in `ClientInner`.
3004/// Owns the EncryptedSession (for packing) and the pending-RPC map.
3005struct ConnectionWriter {
3006    write_half: OwnedWriteHalf,
3007    enc:        EncryptedSession,
3008    frame_kind: FrameKind,
3009}
3010
3011impl ConnectionWriter {
3012    fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
3013    fn first_salt(&self)    -> i64         { self.enc.salt }
3014    fn time_offset(&self)   -> i32         { self.enc.time_offset }
3015}
3016
3017struct Connection {
3018    stream:     TcpStream,
3019    enc:        EncryptedSession,
3020    frame_kind: FrameKind,
3021}
3022
3023impl Connection {
3024    /// Open a TCP stream, optionally via SOCKS5, and apply transport init bytes.
3025    async fn open_stream(
3026        addr:      &str,
3027        socks5:    Option<&crate::socks5::Socks5Config>,
3028        transport: &TransportKind,
3029    ) -> Result<(TcpStream, FrameKind), InvocationError> {
3030        let stream = match socks5 {
3031            Some(proxy) => proxy.connect(addr).await?,
3032            None => {
3033                // Let tokio do the TCP handshake properly (await until connected),
3034                // then apply socket2 keepalive options to the live socket.
3035                let stream = TcpStream::connect(addr).await
3036                    .map_err(InvocationError::Io)?;
3037
3038                // TCP-level keepalive: OS sends probes independently of our
3039                // application-level pings. Catches cases where the network
3040                // disappears without a TCP RST (e.g. mobile data switching,
3041                // NAT table expiry) faster than a 15 s application ping would.
3042                // We use socket2 only for the setsockopt call, not for connect.
3043                {
3044                    let sock = socket2::SockRef::from(&stream);
3045                    let keepalive = TcpKeepalive::new()
3046                        .with_time(Duration::from_secs(TCP_KEEPALIVE_IDLE_SECS))
3047                        .with_interval(Duration::from_secs(TCP_KEEPALIVE_INTERVAL_SECS));
3048                    #[cfg(not(target_os = "windows"))]
3049                    let keepalive = keepalive.with_retries(TCP_KEEPALIVE_PROBES);
3050                    sock.set_tcp_keepalive(&keepalive).ok();
3051                }
3052                stream
3053            }
3054        };
3055        Self::apply_transport_init(stream, transport).await
3056    }
3057
3058    /// Send the transport init bytes and return the stream + FrameKind.
3059    async fn apply_transport_init(
3060        mut stream: TcpStream,
3061        transport:  &TransportKind,
3062    ) -> Result<(TcpStream, FrameKind), InvocationError> {
3063        match transport {
3064            TransportKind::Abridged => {
3065                stream.write_all(&[0xef]).await?;
3066                Ok((stream, FrameKind::Abridged))
3067            }
3068            TransportKind::Intermediate => {
3069                stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
3070                Ok((stream, FrameKind::Intermediate))
3071            }
3072            TransportKind::Full => {
3073                // Full transport has no init byte
3074                Ok((stream, FrameKind::Full { send_seqno: 0, recv_seqno: 0 }))
3075            }
3076            TransportKind::Obfuscated { secret } => {
3077                // For obfuscated we do the full handshake inside open_obfuscated,
3078                // then wrap back in a plain TcpStream via into_inner.
3079                // Since ObfuscatedStream is a different type we reuse the Abridged
3080                // frame logic internally — the encryption layer handles everything.
3081                //
3082                // Implementation note: We convert to Abridged after the handshake
3083                // because ObfuscatedStream internally already uses Abridged framing
3084                // with XOR applied on top.  The outer Connection just sends raw bytes.
3085                let mut nonce = [0u8; 64];
3086                getrandom::getrandom(&mut nonce).map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
3087                // Write obfuscated handshake header
3088                let (enc_key, enc_iv, _dec_key, _dec_iv) = crate::transport_obfuscated::derive_keys(&nonce, secret.as_ref());
3089                let mut enc_cipher = crate::transport_obfuscated::ObfCipher::new(enc_key, enc_iv);
3090                // Stamp protocol tag into nonce[56..60]
3091                let mut handshake = nonce;
3092                handshake[56] = 0xef; handshake[57] = 0xef;
3093                handshake[58] = 0xef; handshake[59] = 0xef;
3094                enc_cipher.apply(&mut handshake[56..]);
3095                stream.write_all(&handshake).await?;
3096                Ok((stream, FrameKind::Abridged))
3097            }
3098        }
3099    }
3100
3101    async fn connect_raw(
3102        addr:      &str,
3103        socks5:    Option<&crate::socks5::Socks5Config>,
3104        transport: &TransportKind,
3105    ) -> Result<Self, InvocationError> {
3106        log::info!("[layer] Connecting to {addr} (DH) …");
3107
3108        // Wrap the entire DH handshake in a timeout so a silent server
3109        // response (e.g. a mis-framed transport error) never causes an
3110        // infinite hang.
3111        let addr2      = addr.to_string();
3112        let socks5_c   = socks5.cloned();
3113        let transport_c = transport.clone();
3114
3115        let fut = async move {
3116            let (mut stream, frame_kind) =
3117                Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
3118
3119            let mut plain = Session::new();
3120
3121            let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3122            send_frame(&mut stream, &plain.pack(&req1).to_plaintext_bytes(), &frame_kind).await?;
3123            let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
3124
3125            let (req2, s2) = auth::step2(s1, res_pq).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3126            send_frame(&mut stream, &plain.pack(&req2).to_plaintext_bytes(), &frame_kind).await?;
3127            let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
3128
3129            let (req3, s3) = auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3130            send_frame(&mut stream, &plain.pack(&req3).to_plaintext_bytes(), &frame_kind).await?;
3131            let ans: tl::enums::SetClientDhParamsAnswer = recv_frame_plain(&mut stream, &frame_kind).await?;
3132
3133            let done = auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3134            log::info!("[layer] DH complete ✓");
3135
3136            Ok::<Self, InvocationError>(Self {
3137                stream,
3138                enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
3139                frame_kind,
3140            })
3141        };
3142
3143        tokio::time::timeout(Duration::from_secs(15), fut)
3144            .await
3145            .map_err(|_| InvocationError::Deserialize(
3146                format!("DH handshake with {addr} timed out after 15 s")
3147            ))?
3148    }
3149
3150    async fn connect_with_key(
3151        addr:        &str,
3152        auth_key:    [u8; 256],
3153        first_salt:  i64,
3154        time_offset: i32,
3155        socks5:      Option<&crate::socks5::Socks5Config>,
3156        transport:   &TransportKind,
3157    ) -> Result<Self, InvocationError> {
3158        let addr2       = addr.to_string();
3159        let socks5_c    = socks5.cloned();
3160        let transport_c = transport.clone();
3161
3162        let fut = async move {
3163            let (stream, frame_kind) =
3164                Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
3165            Ok::<Self, InvocationError>(Self {
3166                stream,
3167                enc: EncryptedSession::new(auth_key, first_salt, time_offset),
3168                frame_kind,
3169            })
3170        };
3171
3172        tokio::time::timeout(Duration::from_secs(15), fut)
3173            .await
3174            .map_err(|_| InvocationError::Deserialize(
3175                format!("connect_with_key to {addr} timed out after 15 s")
3176            ))?
3177    }
3178
3179    fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
3180
3181    /// Split into a write-only `ConnectionWriter` and the TCP read half.
3182    fn into_writer(self) -> (ConnectionWriter, OwnedReadHalf, FrameKind) {
3183        let (read_half, write_half) = self.stream.into_split();
3184        let writer = ConnectionWriter {
3185            write_half,
3186            enc:        self.enc,
3187            frame_kind: self.frame_kind.clone(),
3188        };
3189        (writer, read_half, self.frame_kind)
3190    }
3191}
3192
3193// ─── Transport framing (multi-kind) ──────────────────────────────────────────
3194
3195/// Send a framed message using the active transport kind.
3196async fn send_frame(
3197    stream: &mut TcpStream,
3198    data:   &[u8],
3199    kind:   &FrameKind,
3200) -> Result<(), InvocationError> {
3201    match kind {
3202        FrameKind::Abridged => send_abridged(stream, data).await,
3203        FrameKind::Intermediate => {
3204            stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
3205            stream.write_all(data).await?;
3206            Ok(())
3207        }
3208        FrameKind::Full { .. } => {
3209            // seqno and CRC handled inside Connection; here we just prefix length
3210            // Full framing: [total_len 4B][seqno 4B][payload][crc32 4B]
3211            // But send_frame is called with already-encrypted payload.
3212            // We use a simplified approach: emit the same as Intermediate for now
3213            // and note that Full's seqno/CRC are transport-level, not app-level.
3214            stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
3215            stream.write_all(data).await?;
3216            Ok(())
3217        }
3218    }
3219}
3220
3221// ─── Split-reader helpers ─────────────────────────────────────────────────────
3222
3223/// Outcome of a timed frame read attempt.
3224enum FrameOutcome {
3225    Frame(Vec<u8>),
3226    Error(InvocationError),
3227    Keepalive,  // timeout elapsed but ping was sent; caller should loop
3228}
3229
3230/// Read one frame with a 60-second keepalive timeout (PING_DELAY_SECS).
3231///
3232/// If the timeout fires we send a `PingDelayDisconnect` — this tells Telegram
3233/// to forcibly close the connection after `NO_PING_DISCONNECT` seconds of
3234/// silence, giving us a clean EOF to detect rather than a silently stale socket.
3235/// That mirrors what both grammers and the official Telegram clients do.
3236async fn recv_frame_with_keepalive(
3237    rh:      &mut OwnedReadHalf,
3238    fk:      &FrameKind,
3239    client:  &Client,
3240    _ak:     &[u8; 256],
3241) -> FrameOutcome {
3242    match tokio::time::timeout(Duration::from_secs(PING_DELAY_SECS), recv_frame_read(rh, fk)).await {
3243        Ok(Ok(raw)) => FrameOutcome::Frame(raw),
3244        Ok(Err(e))  => FrameOutcome::Error(e),
3245        Err(_) => {
3246            // Keepalive timeout: send PingDelayDisconnect so Telegram closes the
3247            // connection cleanly (EOF) if it hears nothing for NO_PING_DISCONNECT
3248            // seconds, rather than leaving a silently stale socket.
3249            let ping_req = tl::functions::PingDelayDisconnect {
3250                ping_id:          random_i64(),
3251                disconnect_delay: NO_PING_DISCONNECT,
3252            };
3253            let mut w = client.inner.writer.lock().await;
3254            let wire = w.enc.pack(&ping_req);
3255            let fk = w.frame_kind.clone();
3256            match send_frame_write(&mut w.write_half, &wire, &fk).await {
3257                Ok(()) => FrameOutcome::Keepalive,
3258                // If the write itself fails the connection is already dead.
3259                // Return Error so the reader immediately enters the reconnect loop
3260                // instead of sitting silent for another PING_DELAY_SECS.
3261                Err(e) => FrameOutcome::Error(e),
3262            }
3263        }
3264    }
3265}
3266
3267/// Send a framed message via an OwnedWriteHalf (split connection).
3268async fn send_frame_write(
3269    stream: &mut OwnedWriteHalf,
3270    data:   &[u8],
3271    kind:   &FrameKind,
3272) -> Result<(), InvocationError> {
3273    match kind {
3274        FrameKind::Abridged => {
3275            let words = data.len() / 4;
3276            if words < 0x7f {
3277                stream.write_all(&[words as u8]).await?;
3278            } else {
3279                let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
3280                stream.write_all(&b).await?;
3281            }
3282            stream.write_all(data).await?;
3283            Ok(())
3284        }
3285        FrameKind::Intermediate | FrameKind::Full { .. } => {
3286            stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
3287            stream.write_all(data).await?;
3288            Ok(())
3289        }
3290    }
3291}
3292
3293/// Receive a framed message via an OwnedReadHalf (split connection).
3294async fn recv_frame_read(
3295    stream: &mut OwnedReadHalf,
3296    kind:   &FrameKind,
3297) -> Result<Vec<u8>, InvocationError> {
3298    match kind {
3299        FrameKind::Abridged => {
3300            let mut h = [0u8; 1];
3301            stream.read_exact(&mut h).await?;
3302            let words = if h[0] < 0x7f {
3303                h[0] as usize
3304            } else {
3305                let mut b = [0u8; 3];
3306                stream.read_exact(&mut b).await?;
3307                b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
3308            };
3309            let len = words * 4;
3310            let mut buf = vec![0u8; len];
3311            stream.read_exact(&mut buf).await?;
3312            Ok(buf)
3313        }
3314        FrameKind::Intermediate | FrameKind::Full { .. } => {
3315            let mut len_buf = [0u8; 4];
3316            stream.read_exact(&mut len_buf).await?;
3317            let len = u32::from_le_bytes(len_buf) as usize;
3318            let mut buf = vec![0u8; len];
3319            stream.read_exact(&mut buf).await?;
3320            Ok(buf)
3321        }
3322    }
3323}
3324
3325
3326/// Send using Abridged framing (used for DH plaintext during connect).
3327async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
3328    let words = data.len() / 4;
3329    if words < 0x7f {
3330        stream.write_all(&[words as u8]).await?;
3331    } else {
3332        let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
3333        stream.write_all(&b).await?;
3334    }
3335    stream.write_all(data).await?;
3336    Ok(())
3337}
3338
3339async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
3340    let mut h = [0u8; 1];
3341    stream.read_exact(&mut h).await?;
3342    let words = if h[0] < 0x7f {
3343        h[0] as usize
3344    } else {
3345        let mut b = [0u8; 3];
3346        stream.read_exact(&mut b).await?;
3347        let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
3348        // word count of 1 after 0xFF = Telegram 4-byte transport error code
3349        if w == 1 {
3350            let mut code_buf = [0u8; 4];
3351            stream.read_exact(&mut code_buf).await?;
3352            let code = i32::from_le_bytes(code_buf);
3353            return Err(InvocationError::Rpc(RpcError::from_telegram(code, "transport error")));
3354        }
3355        w
3356    };
3357    // Guard against implausibly large reads — a raw 4-byte transport error
3358    // whose first byte was mis-read as a word count causes a hang otherwise.
3359    if words == 0 || words > 0x8000 {
3360        return Err(InvocationError::Deserialize(
3361            format!("abridged: implausible word count {words} (possible transport error or framing mismatch)")
3362        ));
3363    }
3364    let mut buf = vec![0u8; words * 4];
3365    stream.read_exact(&mut buf).await?;
3366    Ok(buf)
3367}
3368
3369/// Receive a plaintext (pre-auth) frame and deserialize it.
3370async fn recv_frame_plain<T: Deserializable>(
3371    stream: &mut TcpStream,
3372    _kind:  &FrameKind,
3373) -> Result<T, InvocationError> {
3374    let raw = recv_abridged(stream).await?; // DH always uses abridged for plaintext
3375    if raw.len() < 20 {
3376        return Err(InvocationError::Deserialize("plaintext frame too short".into()));
3377    }
3378    if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
3379        return Err(InvocationError::Deserialize("expected auth_key_id=0 in plaintext".into()));
3380    }
3381    let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
3382    let mut cur  = Cursor::from_slice(&raw[20..20 + body_len]);
3383    T::deserialize(&mut cur).map_err(Into::into)
3384}
3385
3386// ─── MTProto envelope ─────────────────────────────────────────────────────────
3387
3388enum EnvelopeResult {
3389    Payload(Vec<u8>),
3390    Updates(Vec<update::Update>),
3391    None,
3392}
3393
3394fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
3395    if body.len() < 4 {
3396        return Err(InvocationError::Deserialize("body < 4 bytes".into()));
3397    }
3398    let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
3399
3400    match cid {
3401        ID_RPC_RESULT => {
3402            if body.len() < 12 {
3403                return Err(InvocationError::Deserialize("rpc_result too short".into()));
3404            }
3405            unwrap_envelope(body[12..].to_vec())
3406        }
3407        ID_RPC_ERROR => {
3408            if body.len() < 8 {
3409                return Err(InvocationError::Deserialize("rpc_error too short".into()));
3410            }
3411            let code    = i32::from_le_bytes(body[4..8].try_into().unwrap());
3412            let message = tl_read_string(&body[8..]).unwrap_or_default();
3413            Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
3414        }
3415        ID_MSG_CONTAINER => {
3416            if body.len() < 8 {
3417                return Err(InvocationError::Deserialize("container too short".into()));
3418            }
3419            let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
3420            let mut pos = 8usize;
3421            let mut payload: Option<Vec<u8>> = None;
3422            let mut updates_buf: Vec<update::Update> = Vec::new();
3423
3424            for _ in 0..count {
3425                if pos + 16 > body.len() { break; }
3426                let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
3427                pos += 16;
3428                if pos + inner_len > body.len() { break; }
3429                let inner = body[pos..pos + inner_len].to_vec();
3430                pos += inner_len;
3431                match unwrap_envelope(inner)? {
3432                    EnvelopeResult::Payload(p)  => { payload = Some(p); }
3433                    EnvelopeResult::Updates(us) => { updates_buf.extend(us); }
3434                    EnvelopeResult::None        => {}
3435                }
3436            }
3437            if let Some(p) = payload {
3438                Ok(EnvelopeResult::Payload(p))
3439            } else if !updates_buf.is_empty() {
3440                Ok(EnvelopeResult::Updates(updates_buf))
3441            } else {
3442                Ok(EnvelopeResult::None)
3443            }
3444        }
3445        ID_GZIP_PACKED => {
3446            let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
3447            unwrap_envelope(gz_inflate(&bytes)?)
3448        }
3449        // MTProto service messages — silently acknowledged, no payload extracted.
3450        // NOTE: ID_PONG is intentionally NOT listed here. Pong arrives as a bare
3451        // top-level frame (never inside rpc_result), so it is handled in route_frame
3452        // directly. Silencing it here would drop it before invoke() can resolve it.
3453        ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY
3454        // These are correctly silenced (grammers silences these too)
3455        | 0xd33b5459  // MsgsStateReq
3456        | 0x04deb57d  // MsgsStateInfo
3457        | 0x8cc0d131  // MsgsAllInfo
3458        | 0x276d3ec6  // MsgDetailedInfo
3459        | 0x809db6df  // MsgNewDetailedInfo
3460        | 0x7d861a08  // MsgResendReq / MsgResendAnsReq
3461        | 0x0949d9dc  // FutureSalt
3462        | 0xae500895  // FutureSalts
3463        | 0x9299359f  // HttpWait
3464        | 0xe22045fc  // DestroySessionOk
3465        | 0x62d350c9  // DestroySessionNone
3466        => {
3467            Ok(EnvelopeResult::None)
3468        }
3469        ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
3470        | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
3471        | ID_UPDATES_TOO_LONG => {
3472            Ok(EnvelopeResult::Updates(update::parse_updates(&body)))
3473        }
3474        _ => Ok(EnvelopeResult::Payload(body)),
3475    }
3476}
3477
3478// ─── Utilities ────────────────────────────────────────────────────────────────
3479
3480fn random_i64() -> i64 {
3481    let mut b = [0u8; 8];
3482    getrandom::getrandom(&mut b).expect("getrandom");
3483    i64::from_le_bytes(b)
3484}
3485
3486/// Apply ±20 % random jitter to a backoff delay.
3487/// Prevents thundering-herd when many clients reconnect simultaneously
3488/// (e.g. after a server restart or a shared network outage).
3489fn jitter_delay(base_ms: u64) -> Duration {
3490    // Use two random bytes for the jitter factor (0..=65535 → 0.80 … 1.20).
3491    let mut b = [0u8; 2];
3492    getrandom::getrandom(&mut b).unwrap_or(());
3493    let rand_frac = u16::from_le_bytes(b) as f64 / 65535.0; // 0.0 … 1.0
3494    let factor    = 0.80 + rand_frac * 0.40;                 // 0.80 … 1.20
3495    Duration::from_millis((base_ms as f64 * factor) as u64)
3496}
3497
3498fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
3499    if data.is_empty() { return Some(vec![]); }
3500    let (len, start) = if data[0] < 254 { (data[0] as usize, 1) }
3501    else if data.len() >= 4 {
3502        (data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16, 4)
3503    } else { return None; };
3504    if data.len() < start + len { return None; }
3505    Some(data[start..start + len].to_vec())
3506}
3507
3508fn tl_read_string(data: &[u8]) -> Option<String> {
3509    tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
3510}
3511
3512fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
3513    use std::io::Read;
3514    let mut out = Vec::new();
3515    if flate2::read::GzDecoder::new(data).read_to_end(&mut out).is_ok() && !out.is_empty() {
3516        return Ok(out);
3517    }
3518    out.clear();
3519    flate2::read::ZlibDecoder::new(data)
3520        .read_to_end(&mut out)
3521        .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
3522    Ok(out)
3523}