Skip to main content

layer_client/
lib.rs

1//! # layer-client
2//!
3//! Production-grade async Telegram client built on MTProto.
4//!
5//! ## Features
6//! - User login (phone + code + 2FA SRP) and bot token login
7//! - Peer access-hash caching — API calls always carry correct access hashes
8//! - `FLOOD_WAIT` auto-retry with configurable policy
9//! - Typed async update stream: `NewMessage`, `MessageEdited`, `MessageDeleted`,
10//!   `CallbackQuery`, `InlineQuery`, `InlineSend`, `Raw`
11//! - Send / edit / delete / forward / pin messages
12//! - Search messages (per-chat and global)
13//! - Mark as read, delete dialogs, clear mentions
14//! - Join chat / accept invite links
15//! - Chat action (typing, uploading, …)
16//! - `get_me()` — fetch own User info
17//! - Paginated dialog and message iterators
18//! - DC migration, session persistence, reconnect
19
20#![deny(unsafe_code)]
21
22mod errors;
23mod retry;
24mod session;
25mod transport;
26mod two_factor_auth;
27pub mod update;
28pub mod parsers;
29pub mod media;
30pub mod participants;
31pub mod pts;
32
33// ── New feature modules ───────────────────────────────────────────────────────
34pub mod dc_pool;
35pub mod transport_obfuscated;
36pub mod transport_intermediate;
37pub mod socks5;
38pub mod session_backend;
39pub mod inline_iter;
40pub mod typing_guard;
41
42pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
43pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
44pub use update::Update;
45pub use media::{UploadedFile, DownloadIter};
46pub use participants::Participant;
47pub use typing_guard::TypingGuard;
48pub use socks5::Socks5Config;
49pub use session_backend::{SessionBackend, BinaryFileBackend, InMemoryBackend};
50
51use std::collections::HashMap;
52use std::collections::VecDeque;
53use std::num::NonZeroU32;
54use std::ops::ControlFlow;
55use std::sync::Arc;
56use std::time::Duration;
57
58use layer_tl_types as tl;
59use layer_mtproto::{EncryptedSession, Session, authentication as auth};
60use layer_tl_types::{Cursor, Deserializable, RemoteCall};
61use session::{DcEntry, PersistedSession};
62use tokio::io::{AsyncReadExt, AsyncWriteExt};
63use tokio::net::TcpStream;
64use tokio::sync::{mpsc, oneshot, Mutex};
65use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
66use tokio::time::sleep;
67
68// ─── MTProto envelope constructor IDs ────────────────────────────────────────
69
70const ID_RPC_RESULT:       u32 = 0xf35c6d01;
71const ID_RPC_ERROR:        u32 = 0x2144ca19;
72const ID_MSG_CONTAINER:    u32 = 0x73f1f8dc;
73const ID_GZIP_PACKED:      u32 = 0x3072cfa1;
74const ID_PONG:             u32 = 0x347773c5;
75const ID_MSGS_ACK:         u32 = 0x62d6b459;
76const ID_BAD_SERVER_SALT:  u32 = 0xedab447b;
77const ID_NEW_SESSION:      u32 = 0x9ec20908;
78const ID_BAD_MSG_NOTIFY:   u32 = 0xa7eff811;
79const ID_UPDATES:          u32 = 0x74ae4240;
80const ID_UPDATE_SHORT:     u32 = 0x78d4dec1;
81const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
82const ID_UPDATE_SHORT_MSG:      u32 = 0x313bc7f8;
83const ID_UPDATE_SHORT_CHAT_MSG: u32 = 0x4d6deea5;
84const ID_UPDATES_TOO_LONG:      u32 = 0xe317af7e;
85
86// ─── PeerCache ────────────────────────────────────────────────────────────────
87
88/// Caches access hashes for users and channels so every API call carries the
89/// correct hash without re-resolving peers.
90#[derive(Default)]
91pub(crate) struct PeerCache {
92    /// user_id → access_hash
93    pub(crate) users:    HashMap<i64, i64>,
94    /// channel_id → access_hash
95    pub(crate) channels: HashMap<i64, i64>,
96}
97
98impl PeerCache {
99    fn cache_user(&mut self, user: &tl::enums::User) {
100        if let tl::enums::User::User(u) = user {
101            if let Some(hash) = u.access_hash {
102                self.users.insert(u.id, hash);
103            }
104        }
105    }
106
107    fn cache_chat(&mut self, chat: &tl::enums::Chat) {
108        match chat {
109            tl::enums::Chat::Channel(c) => {
110                if let Some(hash) = c.access_hash {
111                    self.channels.insert(c.id, hash);
112                }
113            }
114            tl::enums::Chat::ChannelForbidden(c) => {
115                self.channels.insert(c.id, c.access_hash);
116            }
117            _ => {}
118        }
119    }
120
121    fn cache_users(&mut self, users: &[tl::enums::User]) {
122        for u in users { self.cache_user(u); }
123    }
124
125    fn cache_chats(&mut self, chats: &[tl::enums::Chat]) {
126        for c in chats { self.cache_chat(c); }
127    }
128
129    fn user_input_peer(&self, user_id: i64) -> tl::enums::InputPeer {
130        if user_id == 0 {
131            return tl::enums::InputPeer::PeerSelf;
132        }
133        let hash = self.users.get(&user_id).copied().unwrap_or(0);
134        tl::enums::InputPeer::User(tl::types::InputPeerUser { user_id, access_hash: hash })
135    }
136
137    fn channel_input_peer(&self, channel_id: i64) -> tl::enums::InputPeer {
138        let hash = self.channels.get(&channel_id).copied().unwrap_or(0);
139        tl::enums::InputPeer::Channel(tl::types::InputPeerChannel { channel_id, access_hash: hash })
140    }
141
142    fn peer_to_input(&self, peer: &tl::enums::Peer) -> tl::enums::InputPeer {
143        match peer {
144            tl::enums::Peer::User(u) => self.user_input_peer(u.user_id),
145            tl::enums::Peer::Chat(c) => tl::enums::InputPeer::Chat(
146                tl::types::InputPeerChat { chat_id: c.chat_id }
147            ),
148            tl::enums::Peer::Channel(c) => self.channel_input_peer(c.channel_id),
149        }
150    }
151}
152
153// ─── InputMessage builder ─────────────────────────────────────────────────────
154
155/// Builder for composing outgoing messages.
156///
157/// ```rust,no_run
158/// use layer_client::InputMessage;
159///
160/// let msg = InputMessage::text("Hello, *world*!")
161///     .silent(true)
162///     .reply_to(Some(42));
163/// ```
164#[derive(Clone, Default)]
165pub struct InputMessage {
166    pub text:         String,
167    pub reply_to:     Option<i32>,
168    pub silent:       bool,
169    pub background:   bool,
170    pub clear_draft:  bool,
171    pub no_webpage:   bool,
172    pub entities:     Option<Vec<tl::enums::MessageEntity>>,
173    pub reply_markup: Option<tl::enums::ReplyMarkup>,
174    pub schedule_date: Option<i32>,
175}
176
177impl InputMessage {
178    /// Create a message with the given text.
179    pub fn text(text: impl Into<String>) -> Self {
180        Self { text: text.into(), ..Default::default() }
181    }
182
183    /// Set the message text.
184    pub fn set_text(mut self, text: impl Into<String>) -> Self {
185        self.text = text.into(); self
186    }
187
188    /// Reply to a specific message ID.
189    pub fn reply_to(mut self, id: Option<i32>) -> Self {
190        self.reply_to = id; self
191    }
192
193    /// Send silently (no notification sound).
194    pub fn silent(mut self, v: bool) -> Self {
195        self.silent = v; self
196    }
197
198    /// Send in background.
199    pub fn background(mut self, v: bool) -> Self {
200        self.background = v; self
201    }
202
203    /// Clear the draft after sending.
204    pub fn clear_draft(mut self, v: bool) -> Self {
205        self.clear_draft = v; self
206    }
207
208    /// Disable link preview.
209    pub fn no_webpage(mut self, v: bool) -> Self {
210        self.no_webpage = v; self
211    }
212
213    /// Attach formatting entities (bold, italic, code, links, etc).
214    pub fn entities(mut self, e: Vec<tl::enums::MessageEntity>) -> Self {
215        self.entities = Some(e); self
216    }
217
218    /// Attach a reply markup (inline or reply keyboard).
219    pub fn reply_markup(mut self, rm: tl::enums::ReplyMarkup) -> Self {
220        self.reply_markup = Some(rm); self
221    }
222
223    /// Schedule the message for a future Unix timestamp.
224    pub fn schedule_date(mut self, ts: Option<i32>) -> Self {
225        self.schedule_date = ts; self
226    }
227
228    fn reply_header(&self) -> Option<tl::enums::InputReplyTo> {
229        self.reply_to.map(|id| {
230            tl::enums::InputReplyTo::Message(
231                tl::types::InputReplyToMessage {
232                    reply_to_msg_id: id,
233                    top_msg_id: None,
234                    reply_to_peer_id: None,
235                    quote_text: None,
236                    quote_entities: None,
237                    quote_offset: None,
238                    monoforum_peer_id: None,
239                    todo_item_id: None,
240                }
241            )
242        })
243    }
244}
245
246impl From<&str> for InputMessage {
247    fn from(s: &str) -> Self { Self::text(s) }
248}
249
250impl From<String> for InputMessage {
251    fn from(s: String) -> Self { Self::text(s) }
252}
253
254// ─── TransportKind ────────────────────────────────────────────────────────────
255
256/// Which MTProto transport framing to use for all connections.
257///
258/// | Variant | Init bytes | Notes |
259/// |---------|-----------|-------|
260/// | `Abridged` | `0xef` | Default, smallest overhead |
261/// | `Intermediate` | `0xeeeeeeee` | Better proxy compat |
262/// | `Full` | none | Adds seqno + CRC32 |
263/// | `Obfuscated` | random 64B | Bypasses DPI / MTProxy |
264#[derive(Clone, Debug, Default)]
265pub enum TransportKind {
266    /// MTProto [Abridged] transport — length prefix is 1 or 4 bytes.
267    ///
268    /// [Abridged]: https://core.telegram.org/mtproto/mtproto-transports#abridged
269    #[default]
270    Abridged,
271    /// MTProto [Intermediate] transport — 4-byte LE length prefix.
272    ///
273    /// [Intermediate]: https://core.telegram.org/mtproto/mtproto-transports#intermediate
274    Intermediate,
275    /// MTProto [Full] transport — 4-byte length + seqno + CRC32.
276    ///
277    /// [Full]: https://core.telegram.org/mtproto/mtproto-transports#full
278    Full,
279    /// [Obfuscated2] transport — XOR stream cipher over Abridged framing.
280    /// Required for MTProxy and networks with deep-packet inspection.
281    ///
282    /// `secret` is the 16-byte proxy secret, or `None` for keyless obfuscation.
283    ///
284    /// [Obfuscated2]: https://core.telegram.org/mtproto/mtproto-transports#obfuscated-2
285    Obfuscated { secret: Option<[u8; 16]> },
286}
287
288// ─── Config ───────────────────────────────────────────────────────────────────
289
290/// Configuration for [`Client::connect`].
291#[derive(Clone)]
292pub struct Config {
293    pub api_id:         i32,
294    pub api_hash:       String,
295    pub dc_addr:        Option<String>,
296    pub retry_policy:   Arc<dyn RetryPolicy>,
297    /// Optional SOCKS5 proxy — every Telegram connection is tunnelled through it.
298    pub socks5:         Option<crate::socks5::Socks5Config>,
299    /// Allow IPv6 DC addresses when populating the DC table (default: false).
300    pub allow_ipv6:     bool,
301    /// Which MTProto transport framing to use (default: Abridged).
302    pub transport:      TransportKind,
303    /// Session persistence backend (default: binary file `"layer.session"`).
304    pub session_backend: Arc<dyn crate::session_backend::SessionBackend>,
305}
306
307impl Default for Config {
308    fn default() -> Self {
309        Self {
310            api_id:          0,
311            api_hash:        String::new(),
312            dc_addr:         None,
313            retry_policy:    Arc::new(AutoSleep::default()),
314            socks5:          None,
315            allow_ipv6:      false,
316            transport:       TransportKind::Abridged,
317            session_backend: Arc::new(crate::session_backend::BinaryFileBackend::new("layer.session")),
318        }
319    }
320}
321
322// ─── UpdateStream ─────────────────────────────────────────────────────────────
323
324/// Asynchronous stream of [`Update`]s.
325pub struct UpdateStream {
326    rx: mpsc::UnboundedReceiver<update::Update>,
327}
328
329impl UpdateStream {
330    /// Wait for the next update. Returns `None` when the client has disconnected.
331    pub async fn next(&mut self) -> Option<update::Update> {
332        self.rx.recv().await
333    }
334}
335
336// ─── Dialog ───────────────────────────────────────────────────────────────────
337
338/// A Telegram dialog (chat, user, channel).
339#[derive(Debug, Clone)]
340pub struct Dialog {
341    pub raw:     tl::enums::Dialog,
342    pub message: Option<tl::enums::Message>,
343    pub entity:  Option<tl::enums::User>,
344    pub chat:    Option<tl::enums::Chat>,
345}
346
347impl Dialog {
348    /// The dialog's display title.
349    pub fn title(&self) -> String {
350        if let Some(tl::enums::User::User(u)) = &self.entity {
351            let first = u.first_name.as_deref().unwrap_or("");
352            let last  = u.last_name.as_deref().unwrap_or("");
353            let name  = format!("{first} {last}").trim().to_string();
354            if !name.is_empty() { return name; }
355        }
356        if let Some(chat) = &self.chat {
357            return match chat {
358                tl::enums::Chat::Chat(c)         => c.title.clone(),
359                tl::enums::Chat::Forbidden(c) => c.title.clone(),
360                tl::enums::Chat::Channel(c)      => c.title.clone(),
361                tl::enums::Chat::ChannelForbidden(c) => c.title.clone(),
362                tl::enums::Chat::Empty(_)        => "(empty)".into(),
363            };
364        }
365        "(Unknown)".to_string()
366    }
367
368    /// Peer of this dialog.
369    pub fn peer(&self) -> Option<&tl::enums::Peer> {
370        match &self.raw {
371            tl::enums::Dialog::Dialog(d) => Some(&d.peer),
372            tl::enums::Dialog::Folder(_) => None,
373        }
374    }
375
376    /// Unread message count.
377    pub fn unread_count(&self) -> i32 {
378        match &self.raw {
379            tl::enums::Dialog::Dialog(d) => d.unread_count,
380            _ => 0,
381        }
382    }
383
384    /// ID of the top message.
385    pub fn top_message(&self) -> i32 {
386        match &self.raw {
387            tl::enums::Dialog::Dialog(d) => d.top_message,
388            _ => 0,
389        }
390    }
391}
392
393// ─── ClientInner ─────────────────────────────────────────────────────────────
394
395struct ClientInner {
396    /// Write half of the connection — holds the EncryptedSession (for packing)
397    /// and the send half of the TCP stream. The read half is owned by the
398    /// reader task started in connect().
399    writer:          Mutex<ConnectionWriter>,
400    /// Pending RPC replies, keyed by MTProto msg_id.
401    /// RPC callers insert a oneshot::Sender here before sending; the reader
402    /// task routes incoming rpc_result frames to the matching sender.
403    pending:         Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>>,
404    /// Channel used to hand a new (OwnedReadHalf, FrameKind, auth_key, session_id)
405    /// to the reader task after a reconnect.
406    reconnect_tx:    mpsc::UnboundedSender<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
407    home_dc_id:      Mutex<i32>,
408    dc_options:      Mutex<HashMap<i32, DcEntry>>,
409    pub(crate) peer_cache:    Mutex<PeerCache>,
410    pub(crate) pts_state:     Mutex<pts::PtsState>,
411    api_id:          i32,
412    api_hash:        String,
413    retry_policy:    Arc<dyn RetryPolicy>,
414    socks5:          Option<crate::socks5::Socks5Config>,
415    allow_ipv6:      bool,
416    transport:       TransportKind,
417    session_backend: Arc<dyn crate::session_backend::SessionBackend>,
418    dc_pool:         Mutex<dc_pool::DcPool>,
419    update_tx:       mpsc::UnboundedSender<update::Update>,
420}
421
422/// The main Telegram client. Cheap to clone — internally Arc-wrapped.
423#[derive(Clone)]
424pub struct Client {
425    pub(crate) inner: Arc<ClientInner>,
426    _update_rx: Arc<Mutex<mpsc::UnboundedReceiver<update::Update>>>,
427}
428
429impl Client {
430    // ── Connect ────────────────────────────────────────────────────────────
431
432    pub async fn connect(config: Config) -> Result<Self, InvocationError> {
433        let (update_tx, update_rx) = mpsc::unbounded_channel();
434
435        // ── Load or fresh-connect ───────────────────────────────────────
436        let socks5    = config.socks5.clone();
437        let transport = config.transport.clone();
438
439        let (conn, home_dc_id, dc_opts) =
440            match config.session_backend.load()
441                .map_err(InvocationError::Io)?
442            {
443                Some(s) => {
444                    if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
445                        if let Some(key) = dc.auth_key {
446                            log::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
447                            match Connection::connect_with_key(
448                                &dc.addr, key, dc.first_salt, dc.time_offset,
449                                socks5.as_ref(), &transport,
450                            ).await {
451                                Ok(c) => {
452                                    let mut opts = session::default_dc_addresses()
453                                        .into_iter()
454                                        .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
455                                        .collect::<HashMap<_, _>>();
456                                    for d in &s.dcs { opts.insert(d.dc_id, d.clone()); }
457                                    (c, s.home_dc_id, opts)
458                                }
459                                Err(e) => {
460                                    log::warn!("[layer] Session connect failed ({e}), fresh connect …");
461                                    Self::fresh_connect(socks5.as_ref(), &transport).await?
462                                }
463                            }
464                        } else {
465                            Self::fresh_connect(socks5.as_ref(), &transport).await?
466                        }
467                    } else {
468                        Self::fresh_connect(socks5.as_ref(), &transport).await?
469                    }
470                }
471                None => Self::fresh_connect(socks5.as_ref(), &transport).await?,
472            };
473
474        // ── Build DC pool ───────────────────────────────────────────────
475        let pool = dc_pool::DcPool::new(home_dc_id, &dc_opts.values().cloned().collect::<Vec<_>>());
476
477        // Split the TCP stream immediately.
478        // The writer (write half + EncryptedSession) stays in ClientInner.
479        // The read half goes to the reader task which we spawn right now so
480        // that RPC calls during init_connection work correctly.
481        let (writer, read_half, frame_kind) = conn.into_writer();
482        let auth_key   = writer.enc.auth_key_bytes();
483        let session_id = writer.enc.session_id();
484
485        let pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>> =
486            Arc::new(Mutex::new(HashMap::new()));
487
488        // Channel the reconnect logic uses to hand a new read half to the reader task.
489        let (reconnect_tx, reconnect_rx) =
490            mpsc::unbounded_channel::<(OwnedReadHalf, FrameKind, [u8; 256], i64)>();
491
492        let inner = Arc::new(ClientInner {
493            writer:          Mutex::new(writer),
494            pending:         pending.clone(),
495            reconnect_tx,
496            home_dc_id:      Mutex::new(home_dc_id),
497            dc_options:      Mutex::new(dc_opts),
498            peer_cache:      Mutex::new(PeerCache::default()),
499            pts_state:       Mutex::new(pts::PtsState::default()),
500            api_id:          config.api_id,
501            api_hash:        config.api_hash,
502            retry_policy:    config.retry_policy,
503            socks5:          config.socks5,
504            allow_ipv6:      config.allow_ipv6,
505            transport:       config.transport,
506            session_backend: config.session_backend,
507            dc_pool:         Mutex::new(pool),
508            update_tx:       update_tx,
509        });
510
511        let client = Self {
512            inner,
513            _update_rx: Arc::new(Mutex::new(update_rx)),
514        };
515
516        // Spawn the reader task immediately so that RPC calls during
517        // init_connection can receive their responses.
518        {
519            let client_r = client.clone();
520            tokio::spawn(async move {
521                client_r.run_reader_task(
522                    read_half, frame_kind, auth_key, session_id, reconnect_rx,
523                ).await;
524            });
525        }
526
527        // If init_connection fails (e.g. stale auth key rejected by Telegram),
528        // do a fresh DH handshake and retry once.
529        if let Err(e) = client.init_connection().await {
530            log::warn!("[layer] init_connection failed ({e}), retrying with fresh connect …");
531
532            let socks5_r    = client.inner.socks5.clone();
533            let transport_r = client.inner.transport.clone();
534            let (new_conn, new_dc_id, new_opts) =
535                Self::fresh_connect(socks5_r.as_ref(), &transport_r).await?;
536
537            {
538                let mut dc_guard = client.inner.home_dc_id.lock().await;
539                *dc_guard = new_dc_id;
540            }
541            {
542                let mut opts_guard = client.inner.dc_options.lock().await;
543                *opts_guard = new_opts;
544            }
545
546            // Replace writer and hand new read half to reader task
547            let (new_writer, new_read, new_fk) = new_conn.into_writer();
548            let new_ak = new_writer.enc.auth_key_bytes();
549            let new_sid = new_writer.enc.session_id();
550            *client.inner.writer.lock().await = new_writer;
551            let _ = client.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
552
553            client.init_connection().await?;
554        }
555
556        let _ = client.sync_pts_state().await;
557        Ok(client)
558    }
559
560    async fn fresh_connect(
561        socks5:    Option<&crate::socks5::Socks5Config>,
562        transport: &TransportKind,
563    ) -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
564        log::info!("[layer] Fresh connect to DC2 …");
565        let conn = Connection::connect_raw("149.154.167.51:443", socks5, transport).await?;
566        let opts = session::default_dc_addresses()
567            .into_iter()
568            .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
569            .collect();
570        Ok((conn, 2, opts))
571    }
572
573    // ── Session ────────────────────────────────────────────────────────────
574
575    pub async fn save_session(&self) -> Result<(), InvocationError> {
576        let writer_guard = self.inner.writer.lock().await;
577        let home_dc_id   = *self.inner.home_dc_id.lock().await;
578        let dc_options   = self.inner.dc_options.lock().await;
579
580        let mut dcs: Vec<DcEntry> = dc_options.values().map(|e| DcEntry {
581            dc_id:       e.dc_id,
582            addr:        e.addr.clone(),
583            auth_key:    if e.dc_id == home_dc_id { Some(writer_guard.auth_key_bytes()) } else { e.auth_key },
584            first_salt:  if e.dc_id == home_dc_id { writer_guard.first_salt() } else { e.first_salt },
585            time_offset: if e.dc_id == home_dc_id { writer_guard.time_offset() } else { e.time_offset },
586        }).collect();
587        // Collect auth keys from worker DCs in the pool
588        self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
589
590        self.inner.session_backend
591            .save(&PersistedSession { home_dc_id, dcs })
592            .map_err(InvocationError::Io)?;
593        log::info!("[layer] Session saved ✓");
594        Ok(())
595    }
596
597    // ── Auth ───────────────────────────────────────────────────────────────
598
599    /// Returns `true` if the client is already authorized.
600    pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
601        match self.invoke(&tl::functions::updates::GetState {}).await {
602            Ok(_)  => Ok(true),
603            Err(e) if e.is("AUTH_KEY_UNREGISTERED")
604                   || matches!(&e, InvocationError::Rpc(r) if r.code == 401) => Ok(false),
605            Err(e) => Err(e),
606        }
607    }
608
609    /// Sign in as a bot.
610    pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
611        let req = tl::functions::auth::ImportBotAuthorization {
612            flags: 0, api_id: self.inner.api_id,
613            api_hash: self.inner.api_hash.clone(),
614            bot_auth_token: token.to_string(),
615        };
616
617        let result = match self.invoke(&req).await {
618            Ok(x) => x,
619            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
620                let dc_id = r.value.unwrap_or(2) as i32;
621                self.migrate_to(dc_id).await?;
622                self.invoke(&req).await?
623            }
624            Err(e) => return Err(e),
625        };
626
627        let name = match result {
628            tl::enums::auth::Authorization::Authorization(a) => {
629                self.cache_user(&a.user).await;
630                Self::extract_user_name(&a.user)
631            }
632            tl::enums::auth::Authorization::SignUpRequired(_) => {
633                panic!("unexpected SignUpRequired during bot sign-in")
634            }
635        };
636        log::info!("[layer] Bot signed in ✓  ({name})");
637        Ok(name)
638    }
639
640    /// Request a login code for a user account.
641    pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
642        use tl::enums::auth::SentCode;
643
644        let req = self.make_send_code_req(phone);
645        let body = match self.rpc_call_raw(&req).await {
646            Ok(b) => b,
647            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
648                let dc_id = r.value.unwrap_or(2) as i32;
649                self.migrate_to(dc_id).await?;
650                self.rpc_call_raw(&req).await?
651            }
652            Err(e) => return Err(e),
653        };
654
655        let mut cur = Cursor::from_slice(&body);
656        let hash = match tl::enums::auth::SentCode::deserialize(&mut cur)? {
657            SentCode::SentCode(s) => s.phone_code_hash,
658            SentCode::Success(_)  => return Err(InvocationError::Deserialize("unexpected Success".into())),
659            SentCode::PaymentRequired(_) => return Err(InvocationError::Deserialize("payment required to send code".into())),
660        };
661        log::info!("[layer] Login code sent");
662        Ok(LoginToken { phone: phone.to_string(), phone_code_hash: hash })
663    }
664
665    /// Complete sign-in with the code sent to the phone.
666    pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
667        let req = tl::functions::auth::SignIn {
668            phone_number:    token.phone.clone(),
669            phone_code_hash: token.phone_code_hash.clone(),
670            phone_code:      Some(code.trim().to_string()),
671            email_verification: None,
672        };
673
674        let body = match self.rpc_call_raw(&req).await {
675            Ok(b) => b,
676            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
677                let dc_id = r.value.unwrap_or(2) as i32;
678                self.migrate_to(dc_id).await.map_err(SignInError::Other)?;
679                self.rpc_call_raw(&req).await.map_err(SignInError::Other)?
680            }
681            Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
682                let t = self.get_password_info().await.map_err(SignInError::Other)?;
683                return Err(SignInError::PasswordRequired(t));
684            }
685            Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
686            Err(e) => return Err(SignInError::Other(e)),
687        };
688
689        let mut cur = Cursor::from_slice(&body);
690        match tl::enums::auth::Authorization::deserialize(&mut cur)
691            .map_err(|e| SignInError::Other(e.into()))?
692        {
693            tl::enums::auth::Authorization::Authorization(a) => {
694                self.cache_user(&a.user).await;
695                let name = Self::extract_user_name(&a.user);
696                log::info!("[layer] Signed in ✓  Welcome, {name}!");
697                Ok(name)
698            }
699            tl::enums::auth::Authorization::SignUpRequired(_) => Err(SignInError::SignUpRequired),
700        }
701    }
702
703    /// Complete 2FA login.
704    pub async fn check_password(
705        &self,
706        token:    PasswordToken,
707        password: impl AsRef<[u8]>,
708    ) -> Result<String, InvocationError> {
709        let pw   = token.password;
710        let algo = pw.current_algo.ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
711        let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
712        let g_b  = pw.srp_b.ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
713        let a    = pw.secure_random;
714        let srp_id = pw.srp_id.ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
715
716        let (m1, g_a) = two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
717        let req = tl::functions::auth::CheckPassword {
718            password: tl::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
719                tl::types::InputCheckPasswordSrp {
720                    srp_id, a: g_a.to_vec(), m1: m1.to_vec(),
721                },
722            ),
723        };
724
725        let body = self.rpc_call_raw(&req).await?;
726        let mut cur = Cursor::from_slice(&body);
727        match tl::enums::auth::Authorization::deserialize(&mut cur)? {
728            tl::enums::auth::Authorization::Authorization(a) => {
729                self.cache_user(&a.user).await;
730                let name = Self::extract_user_name(&a.user);
731                log::info!("[layer] 2FA ✓  Welcome, {name}!");
732                Ok(name)
733            }
734            tl::enums::auth::Authorization::SignUpRequired(_) =>
735                Err(InvocationError::Deserialize("unexpected SignUpRequired after 2FA".into())),
736        }
737    }
738
739    /// Sign out and invalidate the current session.
740    pub async fn sign_out(&self) -> Result<bool, InvocationError> {
741        let req = tl::functions::auth::LogOut {};
742        match self.rpc_call_raw(&req).await {
743            Ok(_) => { log::info!("[layer] Signed out ✓"); Ok(true) }
744            Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
745            Err(e) => Err(e),
746        }
747    }
748
749    // ── Get self ───────────────────────────────────────────────────────────
750
751    /// Fetch information about the logged-in user.
752    pub async fn get_me(&self) -> Result<tl::types::User, InvocationError> {
753        let req = tl::functions::users::GetUsers {
754            id: vec![tl::enums::InputUser::UserSelf],
755        };
756        let body    = self.rpc_call_raw(&req).await?;
757        let mut cur = Cursor::from_slice(&body);
758        let users   = Vec::<tl::enums::User>::deserialize(&mut cur)?;
759        self.cache_users_slice(&users).await;
760        users.into_iter().find_map(|u| match u {
761            tl::enums::User::User(u) => Some(u),
762            _ => None,
763        }).ok_or_else(|| InvocationError::Deserialize("getUsers returned no user".into()))
764    }
765
766    // ── Updates ────────────────────────────────────────────────────────────
767
768    /// Return an [`UpdateStream`] that yields incoming [`Update`]s.
769    ///
770    /// The reader task (started inside `connect()`) sends all updates to
771    /// `inner.update_tx`. This method proxies those updates into a fresh
772    /// caller-owned channel — typically called once per bot/app loop.
773    pub fn stream_updates(&self) -> UpdateStream {
774        let (caller_tx, rx) = mpsc::unbounded_channel::<update::Update>();
775        // The internal channel is: reader_task → update_tx → _update_rx
776        // We forward _update_rx → caller_tx so the caller gets an owned stream.
777        let internal_rx = self._update_rx.clone();
778        tokio::spawn(async move {
779            // Lock once and hold — this is the sole consumer of the internal channel.
780            let mut guard = internal_rx.lock().await;
781            while let Some(upd) = guard.recv().await {
782                if caller_tx.send(upd).is_err() { break; }
783            }
784        });
785        UpdateStream { rx }
786    }
787
788    // ── Reader task ────────────────────────────────────────────────────────
789    // Started once inside connect(). Owns the TCP read half forever.
790    // Decrypts frames without holding any lock, then routes:
791    //   rpc_result  → pending map (oneshot to waiting RPC caller)
792    //   update      → update_tx  (delivered to stream_updates consumers)
793    //   bad_server_salt → updates writer salt
794    //
795    // DC migration / reconnect: the new read half arrives via new_conn_rx.
796    // The select! between recv_frame_owned and new_conn_rx.recv() ensures we
797    // switch to the new connection immediately, without waiting for the next
798    // frame on the old (now stale) connection.
799
800    async fn run_reader_task(
801        &self,
802        read_half:   OwnedReadHalf,
803        frame_kind:  FrameKind,
804        auth_key:    [u8; 256],
805        session_id:  i64,
806        mut new_conn_rx: mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
807    ) {
808        self.reader_loop(read_half, frame_kind, auth_key, session_id, &mut new_conn_rx).await;
809    }
810
811    async fn reader_loop(
812        &self,
813        mut rh:          OwnedReadHalf,
814        mut fk:          FrameKind,
815        mut ak:          [u8; 256],
816        mut sid:         i64,
817        new_conn_rx:     &mut mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
818    ) {
819        loop {
820            // Race the frame-read against an incoming new-connection signal.
821            // Both branches are polled concurrently; whichever resolves first wins.
822            // If new_conn_rx wins, the frame-read future is dropped (closing old rh).
823            tokio::select! {
824                // Normal frame (or keepalive timeout) from current connection
825                outcome = recv_frame_with_keepalive(&mut rh, &fk, self, &ak) => {
826                    match outcome {
827                        FrameOutcome::Frame(mut raw) => {
828                            let msg = match EncryptedSession::decrypt_frame(&ak, sid, &mut raw) {
829                                Ok(m)  => m,
830                                Err(e) => { log::warn!("[layer] Decrypt error: {e:?}"); continue; }
831                            };
832                            if msg.salt != 0 {
833                                self.inner.writer.lock().await.enc.salt = msg.salt;
834                            }
835                            self.route_frame(msg.body).await;
836                        }
837                        FrameOutcome::Error(e) => {
838                            log::warn!("[layer] Reader: error: {e} — reconnecting …");
839                            self.inner.pending.lock().await.clear();
840                            sleep(Duration::from_secs(1)).await;
841                            match self.do_reconnect(&ak, &fk).await {
842                                Ok((new_rh, new_fk, new_ak, new_sid)) => {
843                                    rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
844                                    let c = self.clone();
845                                    let utx = self.inner.update_tx.clone();
846                                    tokio::spawn(async move {
847                                        if let Ok(missed) = c.get_difference().await {
848                                            for u in missed { let _ = utx.send(u); }
849                                        }
850                                    });
851                                }
852                                Err(e2) => { log::error!("[layer] Reconnect failed: {e2}"); break; }
853                            }
854                        }
855                        FrameOutcome::Keepalive => {} // ping was sent; loop again
856                    }
857                }
858
859                // A new connection arrived (DC migration or deliberate reconnect)
860                maybe = new_conn_rx.recv() => {
861                    if let Some((new_rh, new_fk, new_ak, new_sid)) = maybe {
862                        // frame-read future was dropped → old TCP read half is closed
863                        rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
864                        log::info!("[layer] Reader: switched to new connection");
865                    } else {
866                        break; // reconnect_tx dropped → client shutting down
867                    }
868                }
869            }
870        }
871    }
872
873    /// Route a decrypted MTProto frame body to either a pending RPC caller or update_tx.
874    async fn route_frame(&self, body: Vec<u8>) {
875        if body.len() < 4 { return; }
876        let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
877
878        match cid {
879            ID_RPC_RESULT => {
880                // body[4..12] = req_msg_id of the request this is answering
881                if body.len() < 12 { return; }
882                let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
883                let inner = body[12..].to_vec();
884                // Recursively unwrap the inner payload (handles gzip, containers, etc.)
885                let result = unwrap_envelope(inner);
886                if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
887                    let to_send = match result {
888                        Ok(EnvelopeResult::Payload(p)) => Ok(p),
889                        Ok(EnvelopeResult::Updates(us)) => {
890                            // Write RPCs return Updates — forward them and signal success
891                            for u in us { let _ = self.inner.update_tx.send(u); }
892                            Ok(vec![])
893                        }
894                        Ok(EnvelopeResult::None) => Ok(vec![]),
895                        Err(e) => Err(e),
896                    };
897                    let _ = tx.send(to_send);
898                }
899            }
900            // ID_RPC_ERROR only appears as the INNER body of ID_RPC_RESULT.
901            // At top level it has no req_msg_id — ignore it here (the ID_RPC_RESULT
902            // handler above uses unwrap_envelope which correctly returns Err for it).
903            ID_RPC_ERROR => {
904                log::warn!("[layer] Unexpected top-level rpc_error (no pending target)");
905            }
906            ID_MSG_CONTAINER => {
907                // Container of multiple inner messages — recurse into each
908                if body.len() < 8 { return; }
909                let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
910                let mut pos = 8usize;
911                for _ in 0..count {
912                    if pos + 16 > body.len() { break; }
913                    let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
914                    pos += 16;
915                    if pos + inner_len > body.len() { break; }
916                    let inner = body[pos..pos + inner_len].to_vec();
917                    pos += inner_len;
918                    Box::pin(self.route_frame(inner)).await;
919                }
920            }
921            ID_GZIP_PACKED => {
922                let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
923                if let Ok(inflated) = gz_inflate(&bytes) {
924                    Box::pin(self.route_frame(inflated)).await;
925                }
926            }
927            ID_BAD_SERVER_SALT => {
928                // Server is telling us to use a different salt — update writer enc
929                if body.len() >= 16 {
930                    let new_salt = i64::from_le_bytes(body[8..16].try_into().unwrap());
931                    self.inner.writer.lock().await.enc.salt = new_salt;
932                }
933            }
934            ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
935            | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
936            | ID_UPDATES_TOO_LONG => {
937                for u in update::parse_updates(&body) {
938                    let _ = self.inner.update_tx.send(u);
939                }
940            }
941            // Silently acknowledged service messages — pong, acks, etc.
942            _ => {}
943        }
944    }
945
946    /// Reconnect to the home DC, replace the writer, and return the new read half.
947    async fn do_reconnect(
948        &self,
949        _old_auth_key: &[u8; 256],
950        _old_frame_kind: &FrameKind,
951    ) -> Result<(OwnedReadHalf, FrameKind, [u8; 256], i64), InvocationError> {
952        let home_dc_id = *self.inner.home_dc_id.lock().await;
953        let (addr, saved_key, first_salt, time_offset) = {
954            let opts = self.inner.dc_options.lock().await;
955            match opts.get(&home_dc_id) {
956                Some(e) => (e.addr.clone(), e.auth_key, e.first_salt, e.time_offset),
957                None    => ("149.154.167.51:443".to_string(), None, 0, 0),
958            }
959        };
960        let socks5    = self.inner.socks5.clone();
961        let transport = self.inner.transport.clone();
962
963        let new_conn = if let Some(key) = saved_key {
964            log::info!("[layer] Reconnecting to DC{home_dc_id} with saved key …");
965            match Connection::connect_with_key(
966                &addr, key, first_salt, time_offset, socks5.as_ref(), &transport,
967            ).await {
968                Ok(c)   => c,
969                Err(e2) => {
970                    log::warn!("[layer] connect_with_key failed ({e2}), fresh DH …");
971                    Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
972                }
973            }
974        } else {
975            Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
976        };
977
978        let (new_writer, new_read, new_fk) = new_conn.into_writer();
979        let new_ak  = new_writer.enc.auth_key_bytes();
980        let new_sid = new_writer.enc.session_id();
981        *self.inner.writer.lock().await = new_writer;
982
983        if let Err(e2) = self.init_connection().await {
984            log::warn!("[layer] init_connection after reconnect failed: {e2}");
985        }
986
987        Ok((new_read, new_fk, new_ak, new_sid))
988    }
989
990    // ── Messaging ──────────────────────────────────────────────────────────
991
992    /// Send a text message. Use `"me"` for Saved Messages.
993    pub async fn send_message(&self, peer: &str, text: &str) -> Result<(), InvocationError> {
994        let p = self.resolve_peer(peer).await?;
995        self.send_message_to_peer(p, text).await
996    }
997
998    /// Send a message to an already-resolved peer (plain text shorthand).
999    pub async fn send_message_to_peer(
1000        &self,
1001        peer: tl::enums::Peer,
1002        text: &str,
1003    ) -> Result<(), InvocationError> {
1004        self.send_message_to_peer_ex(peer, &InputMessage::text(text)).await
1005    }
1006
1007    /// Send a message with full [`InputMessage`] options.
1008    pub async fn send_message_to_peer_ex(
1009        &self,
1010        peer: tl::enums::Peer,
1011        msg:  &InputMessage,
1012    ) -> Result<(), InvocationError> {
1013        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1014        let req = tl::functions::messages::SendMessage {
1015            no_webpage:               msg.no_webpage,
1016            silent:                   msg.silent,
1017            background:               msg.background,
1018            clear_draft:              msg.clear_draft,
1019            noforwards:               false,
1020            update_stickersets_order: false,
1021            invert_media:             false,
1022            allow_paid_floodskip:     false,
1023            peer:                     input_peer,
1024            reply_to:                 msg.reply_header(),
1025            message:                  msg.text.clone(),
1026            random_id:                random_i64(),
1027            reply_markup:             msg.reply_markup.clone(),
1028            entities:                 msg.entities.clone(),
1029            schedule_date:            msg.schedule_date,
1030            schedule_repeat_period:   None,
1031            send_as:                  None,
1032            quick_reply_shortcut:     None,
1033            effect:                   None,
1034            allow_paid_stars:         None,
1035            suggested_post:           None,
1036        };
1037        self.rpc_write(&req).await
1038    }
1039
1040    /// Send directly to Saved Messages.
1041    pub async fn send_to_self(&self, text: &str) -> Result<(), InvocationError> {
1042        let req = tl::functions::messages::SendMessage {
1043            no_webpage:               false,
1044            silent:                   false,
1045            background:               false,
1046            clear_draft:              false,
1047            noforwards:               false,
1048            update_stickersets_order: false,
1049            invert_media:             false,
1050            allow_paid_floodskip:     false,
1051            peer:                     tl::enums::InputPeer::PeerSelf,
1052            reply_to:                 None,
1053            message:                  text.to_string(),
1054            random_id:                random_i64(),
1055            reply_markup:             None,
1056            entities:                 None,
1057            schedule_date:            None,
1058            schedule_repeat_period:   None,
1059            send_as:                  None,
1060            quick_reply_shortcut:     None,
1061            effect:                   None,
1062            allow_paid_stars:         None,
1063            suggested_post:           None,
1064        };
1065        self.rpc_write(&req).await
1066    }
1067
1068    /// Edit an existing message.
1069    pub async fn edit_message(
1070        &self,
1071        peer:       tl::enums::Peer,
1072        message_id: i32,
1073        new_text:   &str,
1074    ) -> Result<(), InvocationError> {
1075        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1076        let req = tl::functions::messages::EditMessage {
1077            no_webpage:    false,
1078            invert_media:  false,
1079            peer:          input_peer,
1080            id:            message_id,
1081            message:       Some(new_text.to_string()),
1082            media:         None,
1083            reply_markup:  None,
1084            entities:      None,
1085            schedule_date: None,
1086            schedule_repeat_period: None,
1087            quick_reply_shortcut_id: None,
1088        };
1089        self.rpc_write(&req).await
1090    }
1091
1092    /// Forward messages from `source` to `destination`.
1093    pub async fn forward_messages(
1094        &self,
1095        destination: tl::enums::Peer,
1096        message_ids: &[i32],
1097        source:      tl::enums::Peer,
1098    ) -> Result<(), InvocationError> {
1099        let cache = self.inner.peer_cache.lock().await;
1100        let to_peer   = cache.peer_to_input(&destination);
1101        let from_peer = cache.peer_to_input(&source);
1102        drop(cache);
1103
1104        let req = tl::functions::messages::ForwardMessages {
1105            silent:            false,
1106            background:        false,
1107            with_my_score:     false,
1108            drop_author:       false,
1109            drop_media_captions: false,
1110            noforwards:        false,
1111            from_peer:         from_peer,
1112            id:                message_ids.to_vec(),
1113            random_id:         (0..message_ids.len()).map(|_| random_i64()).collect(),
1114            to_peer:           to_peer,
1115            top_msg_id:        None,
1116            reply_to:          None,
1117            schedule_date:     None,
1118            schedule_repeat_period: None,
1119            send_as:           None,
1120            quick_reply_shortcut: None,
1121            effect:            None,
1122            video_timestamp:   None,
1123            allow_paid_stars:  None,
1124            allow_paid_floodskip: false,
1125            suggested_post:    None,
1126        };
1127        self.rpc_write(&req).await
1128    }
1129
1130    /// Delete messages by ID.
1131    pub async fn delete_messages(&self, message_ids: Vec<i32>, revoke: bool) -> Result<(), InvocationError> {
1132        let req = tl::functions::messages::DeleteMessages { revoke, id: message_ids };
1133        self.rpc_write(&req).await
1134    }
1135
1136    /// Get messages by their IDs from a peer.
1137    pub async fn get_messages_by_id(
1138        &self,
1139        peer: tl::enums::Peer,
1140        ids:  &[i32],
1141    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1142        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1143        let id_list: Vec<tl::enums::InputMessage> = ids.iter()
1144            .map(|&id| tl::enums::InputMessage::Id(tl::types::InputMessageId { id }))
1145            .collect();
1146        let req  = tl::functions::channels::GetMessages {
1147            channel: match &input_peer {
1148                tl::enums::InputPeer::Channel(c) =>
1149                    tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
1150                        channel_id: c.channel_id, access_hash: c.access_hash
1151                    }),
1152                _ => return self.get_messages_user(input_peer, id_list).await,
1153            },
1154            id: id_list,
1155        };
1156        let body    = self.rpc_call_raw(&req).await?;
1157        let mut cur = Cursor::from_slice(&body);
1158        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1159            tl::enums::messages::Messages::Messages(m) => m.messages,
1160            tl::enums::messages::Messages::Slice(m)    => m.messages,
1161            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1162            tl::enums::messages::Messages::NotModified(_) => vec![],
1163        };
1164        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1165    }
1166
1167    async fn get_messages_user(
1168        &self,
1169        _peer: tl::enums::InputPeer,
1170        ids:   Vec<tl::enums::InputMessage>,
1171    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1172        let req = tl::functions::messages::GetMessages { id: ids };
1173        let body    = self.rpc_call_raw(&req).await?;
1174        let mut cur = Cursor::from_slice(&body);
1175        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1176            tl::enums::messages::Messages::Messages(m) => m.messages,
1177            tl::enums::messages::Messages::Slice(m)    => m.messages,
1178            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1179            tl::enums::messages::Messages::NotModified(_) => vec![],
1180        };
1181        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1182    }
1183
1184    /// Get the pinned message in a chat.
1185    pub async fn get_pinned_message(
1186        &self,
1187        peer: tl::enums::Peer,
1188    ) -> Result<Option<update::IncomingMessage>, InvocationError> {
1189        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1190        let req = tl::functions::messages::Search {
1191            peer: input_peer,
1192            q: String::new(),
1193            from_id: None,
1194            saved_peer_id: None,
1195            saved_reaction: None,
1196            top_msg_id: None,
1197            filter: tl::enums::MessagesFilter::InputMessagesFilterPinned,
1198            min_date: 0,
1199            max_date: 0,
1200            offset_id: 0,
1201            add_offset: 0,
1202            limit: 1,
1203            max_id: 0,
1204            min_id: 0,
1205            hash: 0,
1206        };
1207        let body    = self.rpc_call_raw(&req).await?;
1208        let mut cur = Cursor::from_slice(&body);
1209        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1210            tl::enums::messages::Messages::Messages(m) => m.messages,
1211            tl::enums::messages::Messages::Slice(m)    => m.messages,
1212            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1213            tl::enums::messages::Messages::NotModified(_) => vec![],
1214        };
1215        Ok(msgs.into_iter().next().map(update::IncomingMessage::from_raw))
1216    }
1217
1218    /// Pin a message in a chat.
1219    pub async fn pin_message(
1220        &self,
1221        peer:       tl::enums::Peer,
1222        message_id: i32,
1223        silent:     bool,
1224        unpin:      bool,
1225        pm_oneside: bool,
1226    ) -> Result<(), InvocationError> {
1227        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1228        let req = tl::functions::messages::UpdatePinnedMessage {
1229            silent,
1230            unpin,
1231            pm_oneside,
1232            peer: input_peer,
1233            id:   message_id,
1234        };
1235        self.rpc_write(&req).await
1236    }
1237
1238    /// Unpin a specific message.
1239    pub async fn unpin_message(
1240        &self,
1241        peer:       tl::enums::Peer,
1242        message_id: i32,
1243    ) -> Result<(), InvocationError> {
1244        self.pin_message(peer, message_id, true, true, false).await
1245    }
1246
1247    /// Unpin all messages in a chat.
1248    pub async fn unpin_all_messages(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1249        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1250        let req = tl::functions::messages::UnpinAllMessages {
1251            peer:      input_peer,
1252            top_msg_id: None,
1253            saved_peer_id: None,
1254        };
1255        self.rpc_write(&req).await
1256    }
1257
1258    // ── Message search ─────────────────────────────────────────────────────
1259
1260    /// Search messages in a chat.
1261    pub async fn search_messages(
1262        &self,
1263        peer:  tl::enums::Peer,
1264        query: &str,
1265        limit: i32,
1266    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1267        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1268        let req = tl::functions::messages::Search {
1269            peer:         input_peer,
1270            q:            query.to_string(),
1271            from_id:      None,
1272            saved_peer_id: None,
1273            saved_reaction: None,
1274            top_msg_id:   None,
1275            filter:       tl::enums::MessagesFilter::InputMessagesFilterEmpty,
1276            min_date:     0,
1277            max_date:     0,
1278            offset_id:    0,
1279            add_offset:   0,
1280            limit,
1281            max_id:       0,
1282            min_id:       0,
1283            hash:         0,
1284        };
1285        let body    = self.rpc_call_raw(&req).await?;
1286        let mut cur = Cursor::from_slice(&body);
1287        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1288            tl::enums::messages::Messages::Messages(m) => m.messages,
1289            tl::enums::messages::Messages::Slice(m)    => m.messages,
1290            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1291            tl::enums::messages::Messages::NotModified(_) => vec![],
1292        };
1293        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1294    }
1295
1296    /// Search messages globally across all chats.
1297    pub async fn search_global(
1298        &self,
1299        query: &str,
1300        limit: i32,
1301    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1302        let req = tl::functions::messages::SearchGlobal {
1303            broadcasts_only: false,
1304            groups_only:     false,
1305            users_only:      false,
1306            folder_id:       None,
1307            q:               query.to_string(),
1308            filter:          tl::enums::MessagesFilter::InputMessagesFilterEmpty,
1309            min_date:        0,
1310            max_date:        0,
1311            offset_rate:     0,
1312            offset_peer:     tl::enums::InputPeer::Empty,
1313            offset_id:       0,
1314            limit,
1315        };
1316        let body    = self.rpc_call_raw(&req).await?;
1317        let mut cur = Cursor::from_slice(&body);
1318        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1319            tl::enums::messages::Messages::Messages(m) => m.messages,
1320            tl::enums::messages::Messages::Slice(m)    => m.messages,
1321            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1322            tl::enums::messages::Messages::NotModified(_) => vec![],
1323        };
1324        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1325    }
1326
1327    // ── Scheduled messages ─────────────────────────────────────────────────
1328
1329    /// Retrieve all scheduled messages in a chat.
1330    ///
1331    /// Scheduled messages are messages set to be sent at a future time using
1332    /// [`InputMessage::schedule_date`].  Returns them newest-first.
1333    ///
1334    /// # Example
1335    /// ```rust,no_run
1336    /// # async fn f(client: layer_client::Client, peer: layer_tl_types::enums::Peer) -> Result<(), Box<dyn std::error::Error>> {
1337    /// let scheduled = client.get_scheduled_messages(peer).await?;
1338    /// for msg in &scheduled {
1339    ///     println!("Scheduled: {:?} at {:?}", msg.text(), msg.date());
1340    /// }
1341    /// # Ok(()) }
1342    /// ```
1343    pub async fn get_scheduled_messages(
1344        &self,
1345        peer: tl::enums::Peer,
1346    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1347        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1348        let req = tl::functions::messages::GetScheduledHistory {
1349            peer: input_peer,
1350            hash: 0,
1351        };
1352        let body    = self.rpc_call_raw(&req).await?;
1353        let mut cur = Cursor::from_slice(&body);
1354        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1355            tl::enums::messages::Messages::Messages(m)        => m.messages,
1356            tl::enums::messages::Messages::Slice(m)           => m.messages,
1357            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1358            tl::enums::messages::Messages::NotModified(_)     => vec![],
1359        };
1360        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1361    }
1362
1363    /// Delete one or more scheduled messages by their IDs.
1364    pub async fn delete_scheduled_messages(
1365        &self,
1366        peer: tl::enums::Peer,
1367        ids:  Vec<i32>,
1368    ) -> Result<(), InvocationError> {
1369        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1370        let req = tl::functions::messages::DeleteScheduledMessages {
1371            peer: input_peer,
1372            id:   ids,
1373        };
1374        self.rpc_write(&req).await
1375    }
1376
1377    // ── Callback / Inline Queries ──────────────────────────────────────────
1378
1379    pub async fn answer_callback_query(
1380        &self,
1381        query_id: i64,
1382        text:     Option<&str>,
1383        alert:    bool,
1384    ) -> Result<bool, InvocationError> {
1385        let req = tl::functions::messages::SetBotCallbackAnswer {
1386            alert,
1387            query_id,
1388            message:    text.map(|s| s.to_string()),
1389            url:        None,
1390            cache_time: 0,
1391        };
1392        let body = self.rpc_call_raw(&req).await?;
1393        Ok(!body.is_empty())
1394    }
1395
1396    pub async fn answer_inline_query(
1397        &self,
1398        query_id:    i64,
1399        results:     Vec<tl::enums::InputBotInlineResult>,
1400        cache_time:  i32,
1401        is_personal: bool,
1402        next_offset: Option<String>,
1403    ) -> Result<bool, InvocationError> {
1404        let req = tl::functions::messages::SetInlineBotResults {
1405            gallery:        false,
1406            private:        is_personal,
1407            query_id,
1408            results,
1409            cache_time,
1410            next_offset,
1411            switch_pm:      None,
1412            switch_webview: None,
1413        };
1414        let body = self.rpc_call_raw(&req).await?;
1415        Ok(!body.is_empty())
1416    }
1417
1418    // ── Dialogs ────────────────────────────────────────────────────────────
1419
1420    /// Fetch up to `limit` dialogs, most recent first. Populates entity/message.
1421    pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
1422        let req = tl::functions::messages::GetDialogs {
1423            exclude_pinned: false,
1424            folder_id:      None,
1425            offset_date:    0,
1426            offset_id:      0,
1427            offset_peer:    tl::enums::InputPeer::Empty,
1428            limit,
1429            hash:           0,
1430        };
1431
1432        let body    = self.rpc_call_raw(&req).await?;
1433        let mut cur = Cursor::from_slice(&body);
1434        let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
1435            tl::enums::messages::Dialogs::Dialogs(d) => d,
1436            tl::enums::messages::Dialogs::Slice(d)   => tl::types::messages::Dialogs {
1437                dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
1438            },
1439            tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
1440        };
1441
1442        // Build message map
1443        let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
1444            .filter_map(|m| {
1445                let id = match &m {
1446                    tl::enums::Message::Message(x) => x.id,
1447                    tl::enums::Message::Service(x) => x.id,
1448                    tl::enums::Message::Empty(x)   => x.id,
1449                };
1450                Some((id, m))
1451            })
1452            .collect();
1453
1454        // Build user map
1455        let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
1456            .filter_map(|u| {
1457                if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
1458            })
1459            .collect();
1460
1461        // Build chat map
1462        let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
1463            .filter_map(|c| {
1464                let id = match &c {
1465                    tl::enums::Chat::Chat(x)             => x.id,
1466                    tl::enums::Chat::Forbidden(x)    => x.id,
1467                    tl::enums::Chat::Channel(x)          => x.id,
1468                    tl::enums::Chat::ChannelForbidden(x) => x.id,
1469                    tl::enums::Chat::Empty(x)            => x.id,
1470                };
1471                Some((id, c))
1472            })
1473            .collect();
1474
1475        // Cache peers for future access_hash lookups
1476        {
1477            let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
1478            let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
1479            self.cache_users_slice(&u_list).await;
1480            self.cache_chats_slice(&c_list).await;
1481        }
1482
1483        let result = raw.dialogs.into_iter().map(|d| {
1484            let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
1485            let peer   = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
1486
1487            let message = msg_map.get(&top_id).cloned();
1488            let entity = peer.and_then(|p| match p {
1489                tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
1490                _ => None,
1491            });
1492            let chat = peer.and_then(|p| match p {
1493                tl::enums::Peer::Chat(c)    => chat_map.get(&c.chat_id).cloned(),
1494                tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
1495                _ => None,
1496            });
1497
1498            Dialog { raw: d, message, entity, chat }
1499        }).collect();
1500
1501        Ok(result)
1502    }
1503
1504    /// Internal helper: fetch dialogs with a custom GetDialogs request.
1505    async fn get_dialogs_raw(
1506        &self,
1507        req: tl::functions::messages::GetDialogs,
1508    ) -> Result<Vec<Dialog>, InvocationError> {
1509        let body    = self.rpc_call_raw(&req).await?;
1510        let mut cur = Cursor::from_slice(&body);
1511        let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
1512            tl::enums::messages::Dialogs::Dialogs(d) => d,
1513            tl::enums::messages::Dialogs::Slice(d)   => tl::types::messages::Dialogs {
1514                dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
1515            },
1516            tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
1517        };
1518
1519        let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
1520            .filter_map(|m| {
1521                let id = match &m {
1522                    tl::enums::Message::Message(x) => x.id,
1523                    tl::enums::Message::Service(x) => x.id,
1524                    tl::enums::Message::Empty(x)   => x.id,
1525                };
1526                Some((id, m))
1527            })
1528            .collect();
1529
1530        let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
1531            .filter_map(|u| {
1532                if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
1533            })
1534            .collect();
1535
1536        let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
1537            .filter_map(|c| {
1538                let id = match &c {
1539                    tl::enums::Chat::Chat(x)             => x.id,
1540                    tl::enums::Chat::Forbidden(x)    => x.id,
1541                    tl::enums::Chat::Channel(x)          => x.id,
1542                    tl::enums::Chat::ChannelForbidden(x) => x.id,
1543                    tl::enums::Chat::Empty(x)            => x.id,
1544                };
1545                Some((id, c))
1546            })
1547            .collect();
1548
1549        {
1550            let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
1551            let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
1552            self.cache_users_slice(&u_list).await;
1553            self.cache_chats_slice(&c_list).await;
1554        }
1555
1556        let result = raw.dialogs.into_iter().map(|d| {
1557            let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
1558            let peer   = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
1559
1560            let message = msg_map.get(&top_id).cloned();
1561            let entity = peer.and_then(|p| match p {
1562                tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
1563                _ => None,
1564            });
1565            let chat = peer.and_then(|p| match p {
1566                tl::enums::Peer::Chat(c)    => chat_map.get(&c.chat_id).cloned(),
1567                tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
1568                _ => None,
1569            });
1570
1571            Dialog { raw: d, message, entity, chat }
1572        }).collect();
1573
1574        Ok(result)
1575    }
1576    pub async fn delete_dialog(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1577        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1578        let req = tl::functions::messages::DeleteHistory {
1579            just_clear:  false,
1580            revoke:      false,
1581            peer:        input_peer,
1582            max_id:      0,
1583            min_date:    None,
1584            max_date:    None,
1585        };
1586        self.rpc_write(&req).await
1587    }
1588
1589    /// Mark all messages in a chat as read.
1590    pub async fn mark_as_read(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1591        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1592        match &input_peer {
1593            tl::enums::InputPeer::Channel(c) => {
1594                let req = tl::functions::channels::ReadHistory {
1595                    channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
1596                        channel_id: c.channel_id, access_hash: c.access_hash,
1597                    }),
1598                    max_id: 0,
1599                };
1600                self.rpc_call_raw(&req).await?;
1601            }
1602            _ => {
1603                let req = tl::functions::messages::ReadHistory { peer: input_peer, max_id: 0 };
1604                self.rpc_call_raw(&req).await?;
1605            }
1606        }
1607        Ok(())
1608    }
1609
1610    /// Clear unread mention markers.
1611    pub async fn clear_mentions(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1612        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1613        let req = tl::functions::messages::ReadMentions { peer: input_peer, top_msg_id: None };
1614        self.rpc_write(&req).await
1615    }
1616
1617    // ── Chat actions (typing, etc) ─────────────────────────────────────────
1618
1619    /// Send a chat action (typing indicator, uploading photo, etc).
1620    ///
1621    /// For "typing" use `tl::enums::SendMessageAction::Typing`.
1622    pub async fn send_chat_action(
1623        &self,
1624        peer:   tl::enums::Peer,
1625        action: tl::enums::SendMessageAction,
1626    ) -> Result<(), InvocationError> {
1627        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1628        let req = tl::functions::messages::SetTyping {
1629            peer: input_peer,
1630            top_msg_id: None,
1631            action,
1632        };
1633        self.rpc_write(&req).await
1634    }
1635
1636    // ── Join / invite links ────────────────────────────────────────────────
1637
1638    /// Join a public chat or channel by username/peer.
1639    pub async fn join_chat(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1640        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1641        match input_peer {
1642            tl::enums::InputPeer::Channel(c) => {
1643                let req = tl::functions::channels::JoinChannel {
1644                    channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
1645                        channel_id: c.channel_id, access_hash: c.access_hash,
1646                    }),
1647                };
1648                self.rpc_call_raw(&req).await?;
1649            }
1650            tl::enums::InputPeer::Chat(c) => {
1651                let req = tl::functions::messages::AddChatUser {
1652                    chat_id:  c.chat_id,
1653                    user_id:  tl::enums::InputUser::UserSelf,
1654                    fwd_limit: 0,
1655                };
1656                self.rpc_call_raw(&req).await?;
1657            }
1658            _ => return Err(InvocationError::Deserialize("cannot join this peer type".into())),
1659        }
1660        Ok(())
1661    }
1662
1663    /// Accept and join via an invite link.
1664    pub async fn accept_invite_link(&self, link: &str) -> Result<(), InvocationError> {
1665        let hash = Self::parse_invite_hash(link)
1666            .ok_or_else(|| InvocationError::Deserialize(format!("invalid invite link: {link}")))?;
1667        let req = tl::functions::messages::ImportChatInvite { hash: hash.to_string() };
1668        self.rpc_write(&req).await
1669    }
1670
1671    /// Extract hash from `https://t.me/+HASH` or `https://t.me/joinchat/HASH`.
1672    pub fn parse_invite_hash(link: &str) -> Option<&str> {
1673        if let Some(pos) = link.find("/+") {
1674            return Some(&link[pos + 2..]);
1675        }
1676        if let Some(pos) = link.find("/joinchat/") {
1677            return Some(&link[pos + 10..]);
1678        }
1679        None
1680    }
1681
1682    // ── Message history (paginated) ────────────────────────────────────────
1683
1684    /// Fetch a page of messages from a peer's history.
1685    pub async fn get_messages(
1686        &self,
1687        peer:      tl::enums::InputPeer,
1688        limit:     i32,
1689        offset_id: i32,
1690    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1691        let req = tl::functions::messages::GetHistory {
1692            peer, offset_id, offset_date: 0, add_offset: 0,
1693            limit, max_id: 0, min_id: 0, hash: 0,
1694        };
1695        let body    = self.rpc_call_raw(&req).await?;
1696        let mut cur = Cursor::from_slice(&body);
1697        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1698            tl::enums::messages::Messages::Messages(m) => m.messages,
1699            tl::enums::messages::Messages::Slice(m)    => m.messages,
1700            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1701            tl::enums::messages::Messages::NotModified(_) => vec![],
1702        };
1703        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1704    }
1705
1706    // ── Peer resolution ────────────────────────────────────────────────────
1707
1708    /// Resolve a peer string to a [`tl::enums::Peer`].
1709    pub async fn resolve_peer(
1710        &self,
1711        peer: &str,
1712    ) -> Result<tl::enums::Peer, InvocationError> {
1713        match peer.trim() {
1714            "me" | "self" => Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 })),
1715            username if username.starts_with('@') => {
1716                self.resolve_username(&username[1..]).await
1717            }
1718            id_str => {
1719                if let Ok(id) = id_str.parse::<i64>() {
1720                    Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: id }))
1721                } else {
1722                    Err(InvocationError::Deserialize(format!("cannot resolve peer: {peer}")))
1723                }
1724            }
1725        }
1726    }
1727
1728    async fn resolve_username(&self, username: &str) -> Result<tl::enums::Peer, InvocationError> {
1729        let req  = tl::functions::contacts::ResolveUsername {
1730            username: username.to_string(), referer: None,
1731        };
1732        let body = self.rpc_call_raw(&req).await?;
1733        let mut cur = Cursor::from_slice(&body);
1734        let resolved = match tl::enums::contacts::ResolvedPeer::deserialize(&mut cur)? {
1735            tl::enums::contacts::ResolvedPeer::ResolvedPeer(r) => r,
1736        };
1737        // Cache users and chats from the resolution
1738        self.cache_users_slice(&resolved.users).await;
1739        self.cache_chats_slice(&resolved.chats).await;
1740        Ok(resolved.peer)
1741    }
1742
1743    // ── Raw invoke ─────────────────────────────────────────────────────────
1744
1745    /// Invoke any TL function directly, handling flood-wait retries.
1746    pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
1747        let body = self.rpc_call_raw(req).await?;
1748        let mut cur = Cursor::from_slice(&body);
1749        R::Return::deserialize(&mut cur).map_err(Into::into)
1750    }
1751
1752    async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
1753        let mut fail_count   = NonZeroU32::new(1).unwrap();
1754        let mut slept_so_far = Duration::default();
1755        loop {
1756            match self.do_rpc_call(req).await {
1757                Ok(body) => return Ok(body),
1758                Err(e) => {
1759                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
1760                    match self.inner.retry_policy.should_retry(&ctx) {
1761                        ControlFlow::Continue(delay) => {
1762                            sleep(delay).await;
1763                            slept_so_far += delay;
1764                            fail_count = fail_count.saturating_add(1);
1765                        }
1766                        ControlFlow::Break(()) => return Err(ctx.error),
1767                    }
1768                }
1769            }
1770        }
1771    }
1772
1773    /// Send an RPC call and await the response via a oneshot channel.
1774    ///
1775    /// This is the core of the split-stream design:
1776    ///  1. Pack the request and get its msg_id.
1777    ///  2. Register a oneshot Sender in the pending map (BEFORE sending).
1778    ///  3. Send the frame while holding the writer lock.
1779    ///  4. Release the writer lock immediately — the reader task now runs freely.
1780    ///  5. Await the oneshot Receiver; the reader task will fulfill it when
1781    ///     the matching rpc_result frame arrives.
1782    async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
1783        let (tx, rx) = oneshot::channel();
1784        {
1785            let mut w = self.inner.writer.lock().await;
1786            let (wire, msg_id) = w.enc.pack_with_msg_id(req);
1787            let fk = w.frame_kind.clone();
1788            // Insert BEFORE sending — response cannot arrive before we send, but
1789            // inserting first is the safe contract.
1790            self.inner.pending.lock().await.insert(msg_id, tx);
1791            send_frame_write(&mut w.write_half, &wire, &fk).await?;
1792        }
1793        // Writer lock is released. Reader task can now freely read from the socket
1794        // without competing with us.
1795        match tokio::time::timeout(Duration::from_secs(30), rx).await {
1796            Ok(Ok(result)) => result,
1797            Ok(Err(_))     => Err(InvocationError::Deserialize("RPC channel closed (reader died?)".into())),
1798            Err(_)         => Err(InvocationError::Deserialize("RPC timed out after 30 s".into())),
1799        }
1800    }
1801
1802    /// Like `rpc_call_raw` but for write RPCs (Serializable, return type is Updates).
1803    /// Uses the same oneshot mechanism — the reader task signals success/failure.
1804    async fn rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
1805        let mut fail_count   = NonZeroU32::new(1).unwrap();
1806        let mut slept_so_far = Duration::default();
1807        loop {
1808            let result = self.do_rpc_write(req).await;
1809            match result {
1810                Ok(()) => return Ok(()),
1811                Err(e) => {
1812                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
1813                    match self.inner.retry_policy.should_retry(&ctx) {
1814                        ControlFlow::Continue(delay) => {
1815                            sleep(delay).await;
1816                            slept_so_far += delay;
1817                            fail_count = fail_count.saturating_add(1);
1818                        }
1819                        ControlFlow::Break(()) => return Err(ctx.error),
1820                    }
1821                }
1822            }
1823        }
1824    }
1825
1826    async fn do_rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
1827        let (tx, rx) = oneshot::channel();
1828        {
1829            let mut w = self.inner.writer.lock().await;
1830            let (wire, msg_id) = w.enc.pack_serializable_with_msg_id(req);
1831            let fk = w.frame_kind.clone();
1832            self.inner.pending.lock().await.insert(msg_id, tx);
1833            send_frame_write(&mut w.write_half, &wire, &fk).await?;
1834        }
1835        match tokio::time::timeout(Duration::from_secs(30), rx).await {
1836            Ok(Ok(result)) => result.map(|_| ()),  // ignore body for write RPCs
1837            Ok(Err(_))     => Err(InvocationError::Deserialize("rpc_write channel closed".into())),
1838            Err(_)         => Err(InvocationError::Deserialize("rpc_write timed out after 30 s".into())),
1839        }
1840    }
1841
1842    // ── initConnection ─────────────────────────────────────────────────────
1843
1844    async fn init_connection(&self) -> Result<(), InvocationError> {
1845        use tl::functions::{InvokeWithLayer, InitConnection, help::GetConfig};
1846        let req = InvokeWithLayer {
1847            layer: tl::LAYER,
1848            query: InitConnection {
1849                api_id:           self.inner.api_id,
1850                device_model:     "Linux".to_string(),
1851                system_version:   "1.0".to_string(),
1852                app_version:      env!("CARGO_PKG_VERSION").to_string(),
1853                system_lang_code: "en".to_string(),
1854                lang_pack:        "".to_string(),
1855                lang_code:        "en".to_string(),
1856                proxy:            None,
1857                params:           None,
1858                query:            GetConfig {},
1859            },
1860        };
1861
1862        // Use the split-writer oneshot path (reader task routes the response).
1863        let body = self.rpc_call_raw_serializable(&req).await?;
1864
1865        let mut cur = Cursor::from_slice(&body);
1866        if let Ok(tl::enums::Config::Config(cfg)) = tl::enums::Config::deserialize(&mut cur) {
1867            let allow_ipv6 = self.inner.allow_ipv6;
1868            let mut opts = self.inner.dc_options.lock().await;
1869            for opt in &cfg.dc_options {
1870                let tl::enums::DcOption::DcOption(o) = opt;
1871                if o.media_only || o.cdn || o.tcpo_only { continue; }
1872                if o.ipv6 && !allow_ipv6 { continue; }
1873                let addr = format!("{}:{}", o.ip_address, o.port);
1874                let entry = opts.entry(o.id).or_insert_with(|| DcEntry {
1875                    dc_id: o.id, addr: addr.clone(),
1876                    auth_key: None, first_salt: 0, time_offset: 0,
1877                });
1878                entry.addr = addr;
1879            }
1880            log::info!("[layer] initConnection ✓  ({} DCs, ipv6={})", cfg.dc_options.len(), allow_ipv6);
1881        }
1882        Ok(())
1883    }
1884
1885    // ── DC migration ───────────────────────────────────────────────────────
1886
1887    async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
1888        let addr = {
1889            let opts = self.inner.dc_options.lock().await;
1890            opts.get(&new_dc_id).map(|e| e.addr.clone())
1891                .unwrap_or_else(|| "149.154.167.51:443".to_string())
1892        };
1893        log::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
1894
1895        let saved_key = {
1896            let opts = self.inner.dc_options.lock().await;
1897            opts.get(&new_dc_id).and_then(|e| e.auth_key)
1898        };
1899
1900        let socks5    = self.inner.socks5.clone();
1901        let transport = self.inner.transport.clone();
1902        let conn = if let Some(key) = saved_key {
1903            Connection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
1904        } else {
1905            Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
1906        };
1907
1908        let new_key = conn.auth_key_bytes();
1909        {
1910            let mut opts = self.inner.dc_options.lock().await;
1911            let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
1912                dc_id: new_dc_id, addr: addr.clone(),
1913                auth_key: None, first_salt: 0, time_offset: 0,
1914            });
1915            entry.auth_key = Some(new_key);
1916        }
1917
1918        // Split the new connection and replace writer + reader
1919        let (new_writer, new_read, new_fk) = conn.into_writer();
1920        let new_ak  = new_writer.enc.auth_key_bytes();
1921        let new_sid = new_writer.enc.session_id();
1922        *self.inner.writer.lock().await = new_writer;
1923        let _ = self.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
1924        *self.inner.home_dc_id.lock().await = new_dc_id;
1925        self.init_connection().await?;
1926        log::info!("[layer] Now on DC{new_dc_id} ✓");
1927        Ok(())
1928    }
1929
1930    // ── Cache helpers ──────────────────────────────────────────────────────
1931
1932    async fn cache_user(&self, user: &tl::enums::User) {
1933        self.inner.peer_cache.lock().await.cache_user(user);
1934    }
1935
1936    async fn cache_users_slice(&self, users: &[tl::enums::User]) {
1937        let mut cache = self.inner.peer_cache.lock().await;
1938        cache.cache_users(users);
1939    }
1940
1941    async fn cache_chats_slice(&self, chats: &[tl::enums::Chat]) {
1942        let mut cache = self.inner.peer_cache.lock().await;
1943        cache.cache_chats(chats);
1944    }
1945
1946    // Public versions used by sub-modules (media.rs, participants.rs, pts.rs)
1947    #[doc(hidden)]
1948    pub async fn cache_users_slice_pub(&self, users: &[tl::enums::User]) {
1949        self.cache_users_slice(users).await;
1950    }
1951
1952    #[doc(hidden)]
1953    pub async fn cache_chats_slice_pub(&self, chats: &[tl::enums::Chat]) {
1954        self.cache_chats_slice(chats).await;
1955    }
1956
1957    /// Public RPC call for use by sub-modules.
1958    #[doc(hidden)]
1959    pub async fn rpc_call_raw_pub<R: layer_tl_types::RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
1960        self.rpc_call_raw(req).await
1961    }
1962
1963    /// Like rpc_call_raw but takes a Serializable (for InvokeWithLayer wrappers).
1964    async fn rpc_call_raw_serializable<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
1965        let mut fail_count   = NonZeroU32::new(1).unwrap();
1966        let mut slept_so_far = Duration::default();
1967        loop {
1968            match self.do_rpc_write_returning_body(req).await {
1969                Ok(body) => return Ok(body),
1970                Err(e) => {
1971                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
1972                    match self.inner.retry_policy.should_retry(&ctx) {
1973                        ControlFlow::Continue(delay) => {
1974                            sleep(delay).await;
1975                            slept_so_far += delay;
1976                            fail_count = fail_count.saturating_add(1);
1977                        }
1978                        ControlFlow::Break(()) => return Err(ctx.error),
1979                    }
1980                }
1981            }
1982        }
1983    }
1984
1985    async fn do_rpc_write_returning_body<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
1986        let (tx, rx) = oneshot::channel();
1987        {
1988            let mut w = self.inner.writer.lock().await;
1989            let (wire, msg_id) = w.enc.pack_serializable_with_msg_id(req);
1990            let fk = w.frame_kind.clone();
1991            self.inner.pending.lock().await.insert(msg_id, tx);
1992            send_frame_write(&mut w.write_half, &wire, &fk).await?;
1993        }
1994        match tokio::time::timeout(Duration::from_secs(30), rx).await {
1995            Ok(Ok(result)) => result,
1996            Ok(Err(_))     => Err(InvocationError::Deserialize("rpc channel closed".into())),
1997            Err(_)         => Err(InvocationError::Deserialize("rpc timed out after 30 s".into())),
1998        }
1999    }
2000
2001    // ── Paginated dialog iterator ──────────────────────────────────────────
2002
2003    /// Fetch dialogs page by page.
2004    ///
2005    /// Returns a [`DialogIter`] that can be advanced with [`DialogIter::next`].
2006    /// This lets you page through all dialogs without loading them all at once.
2007    ///
2008    /// # Example
2009    /// ```rust,no_run
2010    /// # async fn f(client: layer_client::Client) -> Result<(), Box<dyn std::error::Error>> {
2011    /// let mut iter = client.iter_dialogs();
2012    /// while let Some(dialog) = iter.next(&client).await? {
2013    ///     println!("{}", dialog.title());
2014    /// }
2015    /// # Ok(()) }
2016    /// ```
2017    pub fn iter_dialogs(&self) -> DialogIter {
2018        DialogIter {
2019            offset_date: 0,
2020            offset_id:   0,
2021            offset_peer: tl::enums::InputPeer::Empty,
2022            done:        false,
2023            buffer:      VecDeque::new(),
2024        }
2025    }
2026
2027    /// Fetch messages from a peer, page by page.
2028    ///
2029    /// Returns a [`MessageIter`] that can be advanced with [`MessageIter::next`].
2030    ///
2031    /// # Example
2032    /// ```rust,no_run
2033    /// # async fn f(client: layer_client::Client, peer: layer_tl_types::enums::Peer) -> Result<(), Box<dyn std::error::Error>> {
2034    /// let mut iter = client.iter_messages(peer);
2035    /// while let Some(msg) = iter.next(&client).await? {
2036    ///     println!("{:?}", msg.text());
2037    /// }
2038    /// # Ok(()) }
2039    /// ```
2040    pub fn iter_messages(&self, peer: tl::enums::Peer) -> MessageIter {
2041        MessageIter {
2042            peer,
2043            offset_id: 0,
2044            done:      false,
2045            buffer:    VecDeque::new(),
2046        }
2047    }
2048
2049    // ── resolve_peer helper returning Result on unknown hash ───────────────
2050
2051    /// Try to resolve a peer to InputPeer, returning an error if the access_hash
2052    /// is unknown (i.e. the peer has not been seen in any prior API call).
2053    pub async fn resolve_to_input_peer(
2054        &self,
2055        peer: &tl::enums::Peer,
2056    ) -> Result<tl::enums::InputPeer, InvocationError> {
2057        let cache = self.inner.peer_cache.lock().await;
2058        match peer {
2059            tl::enums::Peer::User(u) => {
2060                if u.user_id == 0 {
2061                    return Ok(tl::enums::InputPeer::PeerSelf);
2062                }
2063                match cache.users.get(&u.user_id) {
2064                    Some(&hash) => Ok(tl::enums::InputPeer::User(tl::types::InputPeerUser {
2065                        user_id: u.user_id, access_hash: hash,
2066                    })),
2067                    None => Err(InvocationError::Deserialize(format!(
2068                        "access_hash unknown for user {}; resolve via username first", u.user_id
2069                    ))),
2070                }
2071            }
2072            tl::enums::Peer::Chat(c) => {
2073                Ok(tl::enums::InputPeer::Chat(tl::types::InputPeerChat { chat_id: c.chat_id }))
2074            }
2075            tl::enums::Peer::Channel(c) => {
2076                match cache.channels.get(&c.channel_id) {
2077                    Some(&hash) => Ok(tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
2078                        channel_id: c.channel_id, access_hash: hash,
2079                    })),
2080                    None => Err(InvocationError::Deserialize(format!(
2081                        "access_hash unknown for channel {}; resolve via username first", c.channel_id
2082                    ))),
2083                }
2084            }
2085        }
2086    }
2087
2088    // ── Multi-DC pool ──────────────────────────────────────────────────────
2089
2090    /// Invoke a request on a specific DC, using the pool.
2091    ///
2092    /// If the target DC has no auth key yet, one is acquired via DH and then
2093    /// authorized via `auth.exportAuthorization` / `auth.importAuthorization`
2094    /// so the worker DC can serve user-account requests too.
2095    pub async fn invoke_on_dc<R: RemoteCall>(
2096        &self,
2097        dc_id: i32,
2098        req:   &R,
2099    ) -> Result<R::Return, InvocationError> {
2100        let body = self.rpc_on_dc_raw(dc_id, req).await?;
2101        let mut cur = Cursor::from_slice(&body);
2102        R::Return::deserialize(&mut cur).map_err(Into::into)
2103    }
2104
2105    /// Raw RPC call routed to `dc_id`, exporting auth if needed.
2106    async fn rpc_on_dc_raw<R: RemoteCall>(
2107        &self,
2108        dc_id: i32,
2109        req:   &R,
2110    ) -> Result<Vec<u8>, InvocationError> {
2111        // Check if we need to open a new connection for this DC
2112        let needs_new = {
2113            let pool = self.inner.dc_pool.lock().await;
2114            !pool.has_connection(dc_id)
2115        };
2116
2117        if needs_new {
2118            let addr = {
2119                let opts = self.inner.dc_options.lock().await;
2120                opts.get(&dc_id).map(|e| e.addr.clone())
2121                    .ok_or_else(|| InvocationError::Deserialize(format!("unknown DC{dc_id}")))?
2122            };
2123
2124            let socks5    = self.inner.socks5.clone();
2125            let transport = self.inner.transport.clone();
2126            let saved_key = {
2127                let opts = self.inner.dc_options.lock().await;
2128                opts.get(&dc_id).and_then(|e| e.auth_key)
2129            };
2130
2131            let dc_conn = if let Some(key) = saved_key {
2132                dc_pool::DcConnection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
2133            } else {
2134                let conn = dc_pool::DcConnection::connect_raw(&addr, socks5.as_ref(), &transport).await?;
2135                // Export auth from home DC and import into worker DC
2136                let home_dc_id = *self.inner.home_dc_id.lock().await;
2137                if dc_id != home_dc_id {
2138                    if let Err(e) = self.export_import_auth(dc_id, &conn).await {
2139                        log::warn!("[layer] Auth export/import for DC{dc_id} failed: {e}");
2140                    }
2141                }
2142                conn
2143            };
2144
2145            let key = dc_conn.auth_key_bytes();
2146            {
2147                let mut opts = self.inner.dc_options.lock().await;
2148                if let Some(e) = opts.get_mut(&dc_id) {
2149                    e.auth_key = Some(key);
2150                }
2151            }
2152            self.inner.dc_pool.lock().await.insert(dc_id, dc_conn);
2153        }
2154
2155        let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
2156        self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, req).await
2157    }
2158
2159    /// Export authorization from the home DC and import it into `dc_id`.
2160    async fn export_import_auth(
2161        &self,
2162        dc_id:   i32,
2163        _dc_conn: &dc_pool::DcConnection, // reserved for future direct import
2164    ) -> Result<(), InvocationError> {
2165        // Export from home DC
2166        let export_req = tl::functions::auth::ExportAuthorization { dc_id };
2167        let body    = self.rpc_call_raw(&export_req).await?;
2168        let mut cur = Cursor::from_slice(&body);
2169        let exported = match tl::enums::auth::ExportedAuthorization::deserialize(&mut cur)? {
2170            tl::enums::auth::ExportedAuthorization::ExportedAuthorization(e) => e,
2171        };
2172
2173        // Import into the target DC via the pool
2174        let import_req = tl::functions::auth::ImportAuthorization {
2175            id:    exported.id,
2176            bytes: exported.bytes,
2177        };
2178        let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
2179        self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, &import_req).await?;
2180        log::info!("[layer] Auth exported+imported to DC{dc_id} ✓");
2181        Ok(())
2182    }
2183
2184    // ── Private helpers ────────────────────────────────────────────────────
2185
2186    async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
2187        let body    = self.rpc_call_raw(&tl::functions::account::GetPassword {}).await?;
2188        let mut cur = Cursor::from_slice(&body);
2189        let pw = match tl::enums::account::Password::deserialize(&mut cur)? {
2190            tl::enums::account::Password::Password(p) => p,
2191        };
2192        Ok(PasswordToken { password: pw })
2193    }
2194
2195    fn make_send_code_req(&self, phone: &str) -> tl::functions::auth::SendCode {
2196        tl::functions::auth::SendCode {
2197            phone_number: phone.to_string(),
2198            api_id:       self.inner.api_id,
2199            api_hash:     self.inner.api_hash.clone(),
2200            settings:     tl::enums::CodeSettings::CodeSettings(
2201                tl::types::CodeSettings {
2202                    allow_flashcall: false, current_number: false, allow_app_hash: false,
2203                    allow_missed_call: false, allow_firebase: false, unknown_number: false,
2204                    logout_tokens: None, token: None, app_sandbox: None,
2205                },
2206            ),
2207        }
2208    }
2209
2210    fn extract_user_name(user: &tl::enums::User) -> String {
2211        match user {
2212            tl::enums::User::User(u) => {
2213                format!("{} {}",
2214                    u.first_name.as_deref().unwrap_or(""),
2215                    u.last_name.as_deref().unwrap_or(""))
2216                    .trim().to_string()
2217            }
2218            tl::enums::User::Empty(_) => "(unknown)".into(),
2219        }
2220    }
2221
2222    fn extract_password_params(
2223        algo: &tl::enums::PasswordKdfAlgo,
2224    ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
2225        match algo {
2226            tl::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
2227                Ok((&a.salt1, &a.salt2, &a.p, a.g))
2228            }
2229            _ => Err(InvocationError::Deserialize("unsupported password KDF algo".into())),
2230        }
2231    }
2232}
2233
2234// ─── Paginated iterators ──────────────────────────────────────────────────────
2235
2236/// Cursor-based iterator over dialogs. Created by [`Client::iter_dialogs`].
2237pub struct DialogIter {
2238    offset_date: i32,
2239    offset_id:   i32,
2240    offset_peer: tl::enums::InputPeer,
2241    done:        bool,
2242    buffer:      VecDeque<Dialog>,
2243}
2244
2245impl DialogIter {
2246    const PAGE_SIZE: i32 = 100;
2247
2248    /// Fetch the next dialog. Returns `None` when all dialogs have been yielded.
2249    pub async fn next(&mut self, client: &Client) -> Result<Option<Dialog>, InvocationError> {
2250        if let Some(d) = self.buffer.pop_front() { return Ok(Some(d)); }
2251        if self.done { return Ok(None); }
2252
2253        let req = tl::functions::messages::GetDialogs {
2254            exclude_pinned: false,
2255            folder_id:      None,
2256            offset_date:    self.offset_date,
2257            offset_id:      self.offset_id,
2258            offset_peer:    self.offset_peer.clone(),
2259            limit:          Self::PAGE_SIZE,
2260            hash:           0,
2261        };
2262
2263        let dialogs = client.get_dialogs_raw(req).await?;
2264        if dialogs.is_empty() || dialogs.len() < Self::PAGE_SIZE as usize {
2265            self.done = true;
2266        }
2267
2268        // Prepare cursor for next page
2269        if let Some(last) = dialogs.last() {
2270            self.offset_date = last.message.as_ref().map(|m| match m {
2271                tl::enums::Message::Message(x) => x.date,
2272                tl::enums::Message::Service(x) => x.date,
2273                _ => 0,
2274            }).unwrap_or(0);
2275            self.offset_id = last.top_message();
2276            if let Some(peer) = last.peer() {
2277                self.offset_peer = client.inner.peer_cache.lock().await.peer_to_input(peer);
2278            }
2279        }
2280
2281        self.buffer.extend(dialogs);
2282        Ok(self.buffer.pop_front())
2283    }
2284}
2285
2286/// Cursor-based iterator over message history. Created by [`Client::iter_messages`].
2287pub struct MessageIter {
2288    peer:      tl::enums::Peer,
2289    offset_id: i32,
2290    done:      bool,
2291    buffer:    VecDeque<update::IncomingMessage>,
2292}
2293
2294impl MessageIter {
2295    const PAGE_SIZE: i32 = 100;
2296
2297    /// Fetch the next message (newest first). Returns `None` when all messages have been yielded.
2298    pub async fn next(&mut self, client: &Client) -> Result<Option<update::IncomingMessage>, InvocationError> {
2299        if let Some(m) = self.buffer.pop_front() { return Ok(Some(m)); }
2300        if self.done { return Ok(None); }
2301
2302        let input_peer = client.inner.peer_cache.lock().await.peer_to_input(&self.peer);
2303        let page = client.get_messages(input_peer, Self::PAGE_SIZE, self.offset_id).await?;
2304
2305        if page.is_empty() || page.len() < Self::PAGE_SIZE as usize {
2306            self.done = true;
2307        }
2308        if let Some(last) = page.last() {
2309            self.offset_id = last.id();
2310        }
2311
2312        self.buffer.extend(page);
2313        Ok(self.buffer.pop_front())
2314    }
2315}
2316
2317// ─── Public random helper (used by media.rs) ──────────────────────────────────
2318
2319/// Public wrapper for `random_i64` used by sub-modules.
2320#[doc(hidden)]
2321pub fn random_i64_pub() -> i64 { random_i64() }
2322
2323// ─── Connection ───────────────────────────────────────────────────────────────
2324
2325/// How framing bytes are sent/received on a connection.
2326#[derive(Clone)]
2327enum FrameKind {
2328    Abridged,
2329    Intermediate,
2330    #[allow(dead_code)]
2331    Full { send_seqno: u32, recv_seqno: u32 },
2332}
2333
2334
2335// ─── Split connection types ───────────────────────────────────────────────────
2336
2337/// Write half of a split connection.  Held under `Mutex` in `ClientInner`.
2338/// Owns the EncryptedSession (for packing) and the pending-RPC map.
2339struct ConnectionWriter {
2340    write_half: OwnedWriteHalf,
2341    enc:        EncryptedSession,
2342    frame_kind: FrameKind,
2343}
2344
2345impl ConnectionWriter {
2346    fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
2347    fn first_salt(&self)    -> i64         { self.enc.salt }
2348    fn time_offset(&self)   -> i32         { self.enc.time_offset }
2349}
2350
2351struct Connection {
2352    stream:     TcpStream,
2353    enc:        EncryptedSession,
2354    frame_kind: FrameKind,
2355}
2356
2357impl Connection {
2358    /// Open a TCP stream, optionally via SOCKS5, and apply transport init bytes.
2359    async fn open_stream(
2360        addr:      &str,
2361        socks5:    Option<&crate::socks5::Socks5Config>,
2362        transport: &TransportKind,
2363    ) -> Result<(TcpStream, FrameKind), InvocationError> {
2364        let stream = match socks5 {
2365            Some(proxy) => proxy.connect(addr).await?,
2366            None        => TcpStream::connect(addr).await?,
2367        };
2368        Self::apply_transport_init(stream, transport).await
2369    }
2370
2371    /// Send the transport init bytes and return the stream + FrameKind.
2372    async fn apply_transport_init(
2373        mut stream: TcpStream,
2374        transport:  &TransportKind,
2375    ) -> Result<(TcpStream, FrameKind), InvocationError> {
2376        match transport {
2377            TransportKind::Abridged => {
2378                stream.write_all(&[0xef]).await?;
2379                Ok((stream, FrameKind::Abridged))
2380            }
2381            TransportKind::Intermediate => {
2382                stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
2383                Ok((stream, FrameKind::Intermediate))
2384            }
2385            TransportKind::Full => {
2386                // Full transport has no init byte
2387                Ok((stream, FrameKind::Full { send_seqno: 0, recv_seqno: 0 }))
2388            }
2389            TransportKind::Obfuscated { secret } => {
2390                // For obfuscated we do the full handshake inside open_obfuscated,
2391                // then wrap back in a plain TcpStream via into_inner.
2392                // Since ObfuscatedStream is a different type we reuse the Abridged
2393                // frame logic internally — the encryption layer handles everything.
2394                //
2395                // Implementation note: We convert to Abridged after the handshake
2396                // because ObfuscatedStream internally already uses Abridged framing
2397                // with XOR applied on top.  The outer Connection just sends raw bytes.
2398                let mut nonce = [0u8; 64];
2399                getrandom::getrandom(&mut nonce).map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
2400                // Write obfuscated handshake header
2401                let (enc_key, enc_iv, _dec_key, _dec_iv) = crate::transport_obfuscated::derive_keys(&nonce, secret.as_ref());
2402                let mut enc_cipher = crate::transport_obfuscated::ObfCipher::new(enc_key, enc_iv);
2403                // Stamp protocol tag into nonce[56..60]
2404                let mut handshake = nonce;
2405                handshake[56] = 0xef; handshake[57] = 0xef;
2406                handshake[58] = 0xef; handshake[59] = 0xef;
2407                enc_cipher.apply(&mut handshake[56..]);
2408                stream.write_all(&handshake).await?;
2409                Ok((stream, FrameKind::Abridged))
2410            }
2411        }
2412    }
2413
2414    async fn connect_raw(
2415        addr:      &str,
2416        socks5:    Option<&crate::socks5::Socks5Config>,
2417        transport: &TransportKind,
2418    ) -> Result<Self, InvocationError> {
2419        log::info!("[layer] Connecting to {addr} (DH) …");
2420
2421        // Wrap the entire DH handshake in a timeout so a silent server
2422        // response (e.g. a mis-framed transport error) never causes an
2423        // infinite hang.
2424        let addr2      = addr.to_string();
2425        let socks5_c   = socks5.cloned();
2426        let transport_c = transport.clone();
2427
2428        let fut = async move {
2429            let (mut stream, frame_kind) =
2430                Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
2431
2432            let mut plain = Session::new();
2433
2434            let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2435            send_frame(&mut stream, &plain.pack(&req1).to_plaintext_bytes(), &frame_kind).await?;
2436            let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
2437
2438            let (req2, s2) = auth::step2(s1, res_pq).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2439            send_frame(&mut stream, &plain.pack(&req2).to_plaintext_bytes(), &frame_kind).await?;
2440            let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
2441
2442            let (req3, s3) = auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2443            send_frame(&mut stream, &plain.pack(&req3).to_plaintext_bytes(), &frame_kind).await?;
2444            let ans: tl::enums::SetClientDhParamsAnswer = recv_frame_plain(&mut stream, &frame_kind).await?;
2445
2446            let done = auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2447            log::info!("[layer] DH complete ✓");
2448
2449            Ok::<Self, InvocationError>(Self {
2450                stream,
2451                enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
2452                frame_kind,
2453            })
2454        };
2455
2456        tokio::time::timeout(Duration::from_secs(15), fut)
2457            .await
2458            .map_err(|_| InvocationError::Deserialize(
2459                format!("DH handshake with {addr} timed out after 15 s")
2460            ))?
2461    }
2462
2463    async fn connect_with_key(
2464        addr:        &str,
2465        auth_key:    [u8; 256],
2466        first_salt:  i64,
2467        time_offset: i32,
2468        socks5:      Option<&crate::socks5::Socks5Config>,
2469        transport:   &TransportKind,
2470    ) -> Result<Self, InvocationError> {
2471        let addr2       = addr.to_string();
2472        let socks5_c    = socks5.cloned();
2473        let transport_c = transport.clone();
2474
2475        let fut = async move {
2476            let (stream, frame_kind) =
2477                Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
2478            Ok::<Self, InvocationError>(Self {
2479                stream,
2480                enc: EncryptedSession::new(auth_key, first_salt, time_offset),
2481                frame_kind,
2482            })
2483        };
2484
2485        tokio::time::timeout(Duration::from_secs(15), fut)
2486            .await
2487            .map_err(|_| InvocationError::Deserialize(
2488                format!("connect_with_key to {addr} timed out after 15 s")
2489            ))?
2490    }
2491
2492    fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
2493
2494    /// Split into a write-only `ConnectionWriter` and the TCP read half.
2495    fn into_writer(self) -> (ConnectionWriter, OwnedReadHalf, FrameKind) {
2496        let (read_half, write_half) = self.stream.into_split();
2497        let writer = ConnectionWriter {
2498            write_half,
2499            enc:        self.enc,
2500            frame_kind: self.frame_kind.clone(),
2501        };
2502        (writer, read_half, self.frame_kind)
2503    }
2504}
2505
2506// ─── Transport framing (multi-kind) ──────────────────────────────────────────
2507
2508/// Send a framed message using the active transport kind.
2509async fn send_frame(
2510    stream: &mut TcpStream,
2511    data:   &[u8],
2512    kind:   &FrameKind,
2513) -> Result<(), InvocationError> {
2514    match kind {
2515        FrameKind::Abridged => send_abridged(stream, data).await,
2516        FrameKind::Intermediate => {
2517            stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
2518            stream.write_all(data).await?;
2519            Ok(())
2520        }
2521        FrameKind::Full { .. } => {
2522            // seqno and CRC handled inside Connection; here we just prefix length
2523            // Full framing: [total_len 4B][seqno 4B][payload][crc32 4B]
2524            // But send_frame is called with already-encrypted payload.
2525            // We use a simplified approach: emit the same as Intermediate for now
2526            // and note that Full's seqno/CRC are transport-level, not app-level.
2527            stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
2528            stream.write_all(data).await?;
2529            Ok(())
2530        }
2531    }
2532}
2533
2534// ─── Split-reader helpers ─────────────────────────────────────────────────────
2535
2536/// Outcome of a timed frame read attempt.
2537enum FrameOutcome {
2538    Frame(Vec<u8>),
2539    Error(InvocationError),
2540    Keepalive,  // timeout elapsed but ping was sent; caller should loop
2541}
2542
2543/// Read one frame with a 25-second keepalive timeout.
2544/// If the timeout fires, send a ping (using the writer) and return Keepalive.
2545async fn recv_frame_with_keepalive(
2546    rh:      &mut OwnedReadHalf,
2547    fk:      &FrameKind,
2548    client:  &Client,
2549    _ak:     &[u8; 256],
2550) -> FrameOutcome {
2551    match tokio::time::timeout(Duration::from_secs(25), recv_frame_read(rh, fk)).await {
2552        Ok(Ok(raw)) => FrameOutcome::Frame(raw),
2553        Ok(Err(e))  => FrameOutcome::Error(e),
2554        Err(_)      => {
2555            // Send keepalive ping
2556            let ping_req = tl::functions::Ping { ping_id: random_i64() };
2557            let mut w = client.inner.writer.lock().await;
2558            let wire = w.enc.pack(&ping_req);
2559            let fk = w.frame_kind.clone();
2560            let _ = send_frame_write(&mut w.write_half, &wire, &fk).await;
2561            FrameOutcome::Keepalive
2562        }
2563    }
2564}
2565
2566/// Send a framed message via an OwnedWriteHalf (split connection).
2567async fn send_frame_write(
2568    stream: &mut OwnedWriteHalf,
2569    data:   &[u8],
2570    kind:   &FrameKind,
2571) -> Result<(), InvocationError> {
2572    match kind {
2573        FrameKind::Abridged => {
2574            let words = data.len() / 4;
2575            if words < 0x7f {
2576                stream.write_all(&[words as u8]).await?;
2577            } else {
2578                let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
2579                stream.write_all(&b).await?;
2580            }
2581            stream.write_all(data).await?;
2582            Ok(())
2583        }
2584        FrameKind::Intermediate | FrameKind::Full { .. } => {
2585            stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
2586            stream.write_all(data).await?;
2587            Ok(())
2588        }
2589    }
2590}
2591
2592/// Receive a framed message via an OwnedReadHalf (split connection).
2593async fn recv_frame_read(
2594    stream: &mut OwnedReadHalf,
2595    kind:   &FrameKind,
2596) -> Result<Vec<u8>, InvocationError> {
2597    match kind {
2598        FrameKind::Abridged => {
2599            let mut h = [0u8; 1];
2600            stream.read_exact(&mut h).await?;
2601            let words = if h[0] < 0x7f {
2602                h[0] as usize
2603            } else {
2604                let mut b = [0u8; 3];
2605                stream.read_exact(&mut b).await?;
2606                b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
2607            };
2608            let len = words * 4;
2609            let mut buf = vec![0u8; len];
2610            stream.read_exact(&mut buf).await?;
2611            Ok(buf)
2612        }
2613        FrameKind::Intermediate | FrameKind::Full { .. } => {
2614            let mut len_buf = [0u8; 4];
2615            stream.read_exact(&mut len_buf).await?;
2616            let len = u32::from_le_bytes(len_buf) as usize;
2617            let mut buf = vec![0u8; len];
2618            stream.read_exact(&mut buf).await?;
2619            Ok(buf)
2620        }
2621    }
2622}
2623
2624
2625/// Send using Abridged framing (used for DH plaintext during connect).
2626async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
2627    let words = data.len() / 4;
2628    if words < 0x7f {
2629        stream.write_all(&[words as u8]).await?;
2630    } else {
2631        let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
2632        stream.write_all(&b).await?;
2633    }
2634    stream.write_all(data).await?;
2635    Ok(())
2636}
2637
2638async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
2639    let mut h = [0u8; 1];
2640    stream.read_exact(&mut h).await?;
2641    let words = if h[0] < 0x7f {
2642        h[0] as usize
2643    } else {
2644        let mut b = [0u8; 3];
2645        stream.read_exact(&mut b).await?;
2646        let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
2647        // word count of 1 after 0xFF = Telegram 4-byte transport error code
2648        if w == 1 {
2649            let mut code_buf = [0u8; 4];
2650            stream.read_exact(&mut code_buf).await?;
2651            let code = i32::from_le_bytes(code_buf);
2652            return Err(InvocationError::Rpc(RpcError::from_telegram(code, "transport error")));
2653        }
2654        w
2655    };
2656    // Guard against implausibly large reads — a raw 4-byte transport error
2657    // whose first byte was mis-read as a word count causes a hang otherwise.
2658    if words == 0 || words > 0x8000 {
2659        return Err(InvocationError::Deserialize(
2660            format!("abridged: implausible word count {words} (possible transport error or framing mismatch)")
2661        ));
2662    }
2663    let mut buf = vec![0u8; words * 4];
2664    stream.read_exact(&mut buf).await?;
2665    Ok(buf)
2666}
2667
2668/// Receive a plaintext (pre-auth) frame and deserialize it.
2669async fn recv_frame_plain<T: Deserializable>(
2670    stream: &mut TcpStream,
2671    _kind:  &FrameKind,
2672) -> Result<T, InvocationError> {
2673    let raw = recv_abridged(stream).await?; // DH always uses abridged for plaintext
2674    if raw.len() < 20 {
2675        return Err(InvocationError::Deserialize("plaintext frame too short".into()));
2676    }
2677    if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
2678        return Err(InvocationError::Deserialize("expected auth_key_id=0 in plaintext".into()));
2679    }
2680    let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
2681    let mut cur  = Cursor::from_slice(&raw[20..20 + body_len]);
2682    T::deserialize(&mut cur).map_err(Into::into)
2683}
2684
2685// ─── MTProto envelope ─────────────────────────────────────────────────────────
2686
2687enum EnvelopeResult {
2688    Payload(Vec<u8>),
2689    Updates(Vec<update::Update>),
2690    None,
2691}
2692
2693fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
2694    if body.len() < 4 {
2695        return Err(InvocationError::Deserialize("body < 4 bytes".into()));
2696    }
2697    let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
2698
2699    match cid {
2700        ID_RPC_RESULT => {
2701            if body.len() < 12 {
2702                return Err(InvocationError::Deserialize("rpc_result too short".into()));
2703            }
2704            unwrap_envelope(body[12..].to_vec())
2705        }
2706        ID_RPC_ERROR => {
2707            if body.len() < 8 {
2708                return Err(InvocationError::Deserialize("rpc_error too short".into()));
2709            }
2710            let code    = i32::from_le_bytes(body[4..8].try_into().unwrap());
2711            let message = tl_read_string(&body[8..]).unwrap_or_default();
2712            Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
2713        }
2714        ID_MSG_CONTAINER => {
2715            if body.len() < 8 {
2716                return Err(InvocationError::Deserialize("container too short".into()));
2717            }
2718            let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
2719            let mut pos = 8usize;
2720            let mut payload: Option<Vec<u8>> = None;
2721            let mut updates_buf: Vec<update::Update> = Vec::new();
2722
2723            for _ in 0..count {
2724                if pos + 16 > body.len() { break; }
2725                let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
2726                pos += 16;
2727                if pos + inner_len > body.len() { break; }
2728                let inner = body[pos..pos + inner_len].to_vec();
2729                pos += inner_len;
2730                match unwrap_envelope(inner)? {
2731                    EnvelopeResult::Payload(p)  => { payload = Some(p); }
2732                    EnvelopeResult::Updates(us) => { updates_buf.extend(us); }
2733                    EnvelopeResult::None        => {}
2734                }
2735            }
2736            if let Some(p) = payload {
2737                Ok(EnvelopeResult::Payload(p))
2738            } else if !updates_buf.is_empty() {
2739                Ok(EnvelopeResult::Updates(updates_buf))
2740            } else {
2741                Ok(EnvelopeResult::None)
2742            }
2743        }
2744        ID_GZIP_PACKED => {
2745            let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
2746            unwrap_envelope(gz_inflate(&bytes)?)
2747        }
2748        // MTProto service messages — all silently acknowledged, no payload extracted
2749        ID_PONG | ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY
2750        // Grammers also silences these; we do the same to avoid routing them as payloads
2751        | 0xd33b5459  // MsgsStateReq
2752        | 0x04deb57d  // MsgsStateInfo
2753        | 0x8cc0d131  // MsgsAllInfo
2754        | 0x276d3ec6  // MsgDetailedInfo
2755        | 0x809db6df  // MsgNewDetailedInfo
2756        | 0x7d861a08  // MsgResendReq / MsgResendAnsReq
2757        | 0x0949d9dc  // FutureSalt
2758        | 0xae500895  // FutureSalts
2759        | 0x9299359f  // HttpWait
2760        | 0xe22045fc  // DestroySessionOk
2761        | 0x62d350c9  // DestroySessionNone
2762        => {
2763            Ok(EnvelopeResult::None)
2764        }
2765        ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
2766        | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
2767        | ID_UPDATES_TOO_LONG => {
2768            Ok(EnvelopeResult::Updates(update::parse_updates(&body)))
2769        }
2770        _ => Ok(EnvelopeResult::Payload(body)),
2771    }
2772}
2773
2774// ─── Utilities ────────────────────────────────────────────────────────────────
2775
2776fn random_i64() -> i64 {
2777    let mut b = [0u8; 8];
2778    getrandom::getrandom(&mut b).expect("getrandom");
2779    i64::from_le_bytes(b)
2780}
2781
2782fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
2783    if data.is_empty() { return Some(vec![]); }
2784    let (len, start) = if data[0] < 254 { (data[0] as usize, 1) }
2785    else if data.len() >= 4 {
2786        (data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16, 4)
2787    } else { return None; };
2788    if data.len() < start + len { return None; }
2789    Some(data[start..start + len].to_vec())
2790}
2791
2792fn tl_read_string(data: &[u8]) -> Option<String> {
2793    tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
2794}
2795
2796fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
2797    use std::io::Read;
2798    let mut out = Vec::new();
2799    if flate2::read::GzDecoder::new(data).read_to_end(&mut out).is_ok() && !out.is_empty() {
2800        return Ok(out);
2801    }
2802    out.clear();
2803    flate2::read::ZlibDecoder::new(data)
2804        .read_to_end(&mut out)
2805        .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
2806    Ok(out)
2807}