Skip to main content

layer_client/
lib.rs

1//! # layer-client
2//!
3//! Production-grade async Telegram client built on MTProto.
4//!
5//! ## Features
6//! - User login (phone + code + 2FA SRP) and bot token login
7//! - Peer access-hash caching — API calls always carry correct access hashes
8//! - `FLOOD_WAIT` auto-retry with configurable policy
9//! - Typed async update stream: `NewMessage`, `MessageEdited`, `MessageDeleted`,
10//!   `CallbackQuery`, `InlineQuery`, `InlineSend`, `Raw`
11//! - Send / edit / delete / forward / pin messages
12//! - Search messages (per-chat and global)
13//! - Mark as read, delete dialogs, clear mentions
14//! - Join chat / accept invite links
15//! - Chat action (typing, uploading, …)
16//! - `get_me()` — fetch own User info
17//! - Paginated dialog and message iterators
18//! - DC migration, session persistence, reconnect
19
20#![deny(unsafe_code)]
21
22mod errors;
23mod retry;
24mod session;
25mod transport;
26mod two_factor_auth;
27pub mod update;
28pub mod parsers;
29pub mod media;
30pub mod participants;
31pub mod pts;
32
33// ── New feature modules ───────────────────────────────────────────────────────
34pub mod dc_pool;
35pub mod transport_obfuscated;
36pub mod transport_intermediate;
37pub mod socks5;
38pub mod session_backend;
39pub mod inline_iter;
40pub mod typing_guard;
41
42pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
43pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
44pub use update::Update;
45pub use media::{UploadedFile, DownloadIter};
46pub use participants::Participant;
47pub use typing_guard::TypingGuard;
48pub use socks5::Socks5Config;
49pub use session_backend::{SessionBackend, BinaryFileBackend, InMemoryBackend};
50
51use std::collections::HashMap;
52use std::collections::VecDeque;
53use std::num::NonZeroU32;
54use std::ops::ControlFlow;
55use std::sync::Arc;
56use std::time::Duration;
57
58use layer_tl_types as tl;
59use layer_mtproto::{EncryptedSession, Session, authentication as auth};
60use layer_tl_types::{Cursor, Deserializable, RemoteCall};
61use session::{DcEntry, PersistedSession};
62use tokio::io::{AsyncReadExt, AsyncWriteExt};
63use tokio::net::TcpStream;
64use tokio::sync::{mpsc, Mutex};
65use tokio::time::sleep;
66
67// ─── MTProto envelope constructor IDs ────────────────────────────────────────
68
69const ID_RPC_RESULT:       u32 = 0xf35c6d01;
70const ID_RPC_ERROR:        u32 = 0x2144ca19;
71const ID_MSG_CONTAINER:    u32 = 0x73f1f8dc;
72const ID_GZIP_PACKED:      u32 = 0x3072cfa1;
73const ID_PONG:             u32 = 0x347773c5;
74const ID_MSGS_ACK:         u32 = 0x62d6b459;
75const ID_BAD_SERVER_SALT:  u32 = 0xedab447b;
76const ID_NEW_SESSION:      u32 = 0x9ec20908;
77const ID_BAD_MSG_NOTIFY:   u32 = 0xa7eff811;
78const ID_UPDATES:          u32 = 0x74ae4240;
79const ID_UPDATE_SHORT:     u32 = 0x78d4dec1;
80const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
81const ID_UPDATE_SHORT_MSG:      u32 = 0x313bc7f8;
82const ID_UPDATE_SHORT_CHAT_MSG: u32 = 0x4d6deea5;
83const ID_UPDATES_TOO_LONG:      u32 = 0xe317af7e;
84
85// ─── PeerCache ────────────────────────────────────────────────────────────────
86
87/// Caches access hashes for users and channels so every API call carries the
88/// correct hash without re-resolving peers.
89#[derive(Default)]
90pub(crate) struct PeerCache {
91    /// user_id → access_hash
92    pub(crate) users:    HashMap<i64, i64>,
93    /// channel_id → access_hash
94    pub(crate) channels: HashMap<i64, i64>,
95}
96
97impl PeerCache {
98    fn cache_user(&mut self, user: &tl::enums::User) {
99        if let tl::enums::User::User(u) = user {
100            if let Some(hash) = u.access_hash {
101                self.users.insert(u.id, hash);
102            }
103        }
104    }
105
106    fn cache_chat(&mut self, chat: &tl::enums::Chat) {
107        match chat {
108            tl::enums::Chat::Channel(c) => {
109                if let Some(hash) = c.access_hash {
110                    self.channels.insert(c.id, hash);
111                }
112            }
113            tl::enums::Chat::ChannelForbidden(c) => {
114                self.channels.insert(c.id, c.access_hash);
115            }
116            _ => {}
117        }
118    }
119
120    fn cache_users(&mut self, users: &[tl::enums::User]) {
121        for u in users { self.cache_user(u); }
122    }
123
124    fn cache_chats(&mut self, chats: &[tl::enums::Chat]) {
125        for c in chats { self.cache_chat(c); }
126    }
127
128    fn user_input_peer(&self, user_id: i64) -> tl::enums::InputPeer {
129        if user_id == 0 {
130            return tl::enums::InputPeer::PeerSelf;
131        }
132        let hash = self.users.get(&user_id).copied().unwrap_or(0);
133        tl::enums::InputPeer::User(tl::types::InputPeerUser { user_id, access_hash: hash })
134    }
135
136    fn channel_input_peer(&self, channel_id: i64) -> tl::enums::InputPeer {
137        let hash = self.channels.get(&channel_id).copied().unwrap_or(0);
138        tl::enums::InputPeer::Channel(tl::types::InputPeerChannel { channel_id, access_hash: hash })
139    }
140
141    fn peer_to_input(&self, peer: &tl::enums::Peer) -> tl::enums::InputPeer {
142        match peer {
143            tl::enums::Peer::User(u) => self.user_input_peer(u.user_id),
144            tl::enums::Peer::Chat(c) => tl::enums::InputPeer::Chat(
145                tl::types::InputPeerChat { chat_id: c.chat_id }
146            ),
147            tl::enums::Peer::Channel(c) => self.channel_input_peer(c.channel_id),
148        }
149    }
150}
151
152// ─── InputMessage builder ─────────────────────────────────────────────────────
153
154/// Builder for composing outgoing messages.
155///
156/// ```rust,no_run
157/// use layer_client::InputMessage;
158///
159/// let msg = InputMessage::text("Hello, *world*!")
160///     .silent(true)
161///     .reply_to(Some(42));
162/// ```
163#[derive(Clone, Default)]
164pub struct InputMessage {
165    pub text:         String,
166    pub reply_to:     Option<i32>,
167    pub silent:       bool,
168    pub background:   bool,
169    pub clear_draft:  bool,
170    pub no_webpage:   bool,
171    pub entities:     Option<Vec<tl::enums::MessageEntity>>,
172    pub reply_markup: Option<tl::enums::ReplyMarkup>,
173    pub schedule_date: Option<i32>,
174}
175
176impl InputMessage {
177    /// Create a message with the given text.
178    pub fn text(text: impl Into<String>) -> Self {
179        Self { text: text.into(), ..Default::default() }
180    }
181
182    /// Set the message text.
183    pub fn set_text(mut self, text: impl Into<String>) -> Self {
184        self.text = text.into(); self
185    }
186
187    /// Reply to a specific message ID.
188    pub fn reply_to(mut self, id: Option<i32>) -> Self {
189        self.reply_to = id; self
190    }
191
192    /// Send silently (no notification sound).
193    pub fn silent(mut self, v: bool) -> Self {
194        self.silent = v; self
195    }
196
197    /// Send in background.
198    pub fn background(mut self, v: bool) -> Self {
199        self.background = v; self
200    }
201
202    /// Clear the draft after sending.
203    pub fn clear_draft(mut self, v: bool) -> Self {
204        self.clear_draft = v; self
205    }
206
207    /// Disable link preview.
208    pub fn no_webpage(mut self, v: bool) -> Self {
209        self.no_webpage = v; self
210    }
211
212    /// Attach formatting entities (bold, italic, code, links, etc).
213    pub fn entities(mut self, e: Vec<tl::enums::MessageEntity>) -> Self {
214        self.entities = Some(e); self
215    }
216
217    /// Attach a reply markup (inline or reply keyboard).
218    pub fn reply_markup(mut self, rm: tl::enums::ReplyMarkup) -> Self {
219        self.reply_markup = Some(rm); self
220    }
221
222    /// Schedule the message for a future Unix timestamp.
223    pub fn schedule_date(mut self, ts: Option<i32>) -> Self {
224        self.schedule_date = ts; self
225    }
226
227    fn reply_header(&self) -> Option<tl::enums::InputReplyTo> {
228        self.reply_to.map(|id| {
229            tl::enums::InputReplyTo::Message(
230                tl::types::InputReplyToMessage {
231                    reply_to_msg_id: id,
232                    top_msg_id: None,
233                    reply_to_peer_id: None,
234                    quote_text: None,
235                    quote_entities: None,
236                    quote_offset: None,
237                    monoforum_peer_id: None,
238                    todo_item_id: None,
239                }
240            )
241        })
242    }
243}
244
245impl From<&str> for InputMessage {
246    fn from(s: &str) -> Self { Self::text(s) }
247}
248
249impl From<String> for InputMessage {
250    fn from(s: String) -> Self { Self::text(s) }
251}
252
253// ─── TransportKind ────────────────────────────────────────────────────────────
254
255/// Which MTProto transport framing to use for all connections.
256///
257/// | Variant | Init bytes | Notes |
258/// |---------|-----------|-------|
259/// | `Abridged` | `0xef` | Default, smallest overhead |
260/// | `Intermediate` | `0xeeeeeeee` | Better proxy compat |
261/// | `Full` | none | Adds seqno + CRC32 |
262/// | `Obfuscated` | random 64B | Bypasses DPI / MTProxy |
263#[derive(Clone, Debug, Default)]
264pub enum TransportKind {
265    /// MTProto [Abridged] transport — length prefix is 1 or 4 bytes.
266    ///
267    /// [Abridged]: https://core.telegram.org/mtproto/mtproto-transports#abridged
268    #[default]
269    Abridged,
270    /// MTProto [Intermediate] transport — 4-byte LE length prefix.
271    ///
272    /// [Intermediate]: https://core.telegram.org/mtproto/mtproto-transports#intermediate
273    Intermediate,
274    /// MTProto [Full] transport — 4-byte length + seqno + CRC32.
275    ///
276    /// [Full]: https://core.telegram.org/mtproto/mtproto-transports#full
277    Full,
278    /// [Obfuscated2] transport — XOR stream cipher over Abridged framing.
279    /// Required for MTProxy and networks with deep-packet inspection.
280    ///
281    /// `secret` is the 16-byte proxy secret, or `None` for keyless obfuscation.
282    ///
283    /// [Obfuscated2]: https://core.telegram.org/mtproto/mtproto-transports#obfuscated-2
284    Obfuscated { secret: Option<[u8; 16]> },
285}
286
287// ─── Config ───────────────────────────────────────────────────────────────────
288
289/// Configuration for [`Client::connect`].
290#[derive(Clone)]
291pub struct Config {
292    pub api_id:         i32,
293    pub api_hash:       String,
294    pub dc_addr:        Option<String>,
295    pub retry_policy:   Arc<dyn RetryPolicy>,
296    /// Optional SOCKS5 proxy — every Telegram connection is tunnelled through it.
297    pub socks5:         Option<crate::socks5::Socks5Config>,
298    /// Allow IPv6 DC addresses when populating the DC table (default: false).
299    pub allow_ipv6:     bool,
300    /// Which MTProto transport framing to use (default: Abridged).
301    pub transport:      TransportKind,
302    /// Session persistence backend (default: binary file `"layer.session"`).
303    pub session_backend: Arc<dyn crate::session_backend::SessionBackend>,
304}
305
306impl Default for Config {
307    fn default() -> Self {
308        Self {
309            api_id:          0,
310            api_hash:        String::new(),
311            dc_addr:         None,
312            retry_policy:    Arc::new(AutoSleep::default()),
313            socks5:          None,
314            allow_ipv6:      false,
315            transport:       TransportKind::Abridged,
316            session_backend: Arc::new(crate::session_backend::BinaryFileBackend::new("layer.session")),
317        }
318    }
319}
320
321// ─── UpdateStream ─────────────────────────────────────────────────────────────
322
323/// Asynchronous stream of [`Update`]s.
324pub struct UpdateStream {
325    rx: mpsc::UnboundedReceiver<update::Update>,
326}
327
328impl UpdateStream {
329    /// Wait for the next update. Returns `None` when the client has disconnected.
330    pub async fn next(&mut self) -> Option<update::Update> {
331        self.rx.recv().await
332    }
333}
334
335// ─── Dialog ───────────────────────────────────────────────────────────────────
336
337/// A Telegram dialog (chat, user, channel).
338#[derive(Debug, Clone)]
339pub struct Dialog {
340    pub raw:     tl::enums::Dialog,
341    pub message: Option<tl::enums::Message>,
342    pub entity:  Option<tl::enums::User>,
343    pub chat:    Option<tl::enums::Chat>,
344}
345
346impl Dialog {
347    /// The dialog's display title.
348    pub fn title(&self) -> String {
349        if let Some(tl::enums::User::User(u)) = &self.entity {
350            let first = u.first_name.as_deref().unwrap_or("");
351            let last  = u.last_name.as_deref().unwrap_or("");
352            let name  = format!("{first} {last}").trim().to_string();
353            if !name.is_empty() { return name; }
354        }
355        if let Some(chat) = &self.chat {
356            return match chat {
357                tl::enums::Chat::Chat(c)         => c.title.clone(),
358                tl::enums::Chat::Forbidden(c) => c.title.clone(),
359                tl::enums::Chat::Channel(c)      => c.title.clone(),
360                tl::enums::Chat::ChannelForbidden(c) => c.title.clone(),
361                tl::enums::Chat::Empty(_)        => "(empty)".into(),
362            };
363        }
364        "(Unknown)".to_string()
365    }
366
367    /// Peer of this dialog.
368    pub fn peer(&self) -> Option<&tl::enums::Peer> {
369        match &self.raw {
370            tl::enums::Dialog::Dialog(d) => Some(&d.peer),
371            tl::enums::Dialog::Folder(_) => None,
372        }
373    }
374
375    /// Unread message count.
376    pub fn unread_count(&self) -> i32 {
377        match &self.raw {
378            tl::enums::Dialog::Dialog(d) => d.unread_count,
379            _ => 0,
380        }
381    }
382
383    /// ID of the top message.
384    pub fn top_message(&self) -> i32 {
385        match &self.raw {
386            tl::enums::Dialog::Dialog(d) => d.top_message,
387            _ => 0,
388        }
389    }
390}
391
392// ─── ClientInner ─────────────────────────────────────────────────────────────
393
394struct ClientInner {
395    conn:            Mutex<Connection>,
396    home_dc_id:      Mutex<i32>,
397    dc_options:      Mutex<HashMap<i32, DcEntry>>,
398    pub(crate) peer_cache:    Mutex<PeerCache>,
399    pub(crate) pts_state:     Mutex<pts::PtsState>,
400    api_id:          i32,
401    api_hash:        String,
402    retry_policy:    Arc<dyn RetryPolicy>,
403    socks5:          Option<crate::socks5::Socks5Config>,
404    allow_ipv6:      bool,
405    transport:       TransportKind,
406    session_backend: Arc<dyn crate::session_backend::SessionBackend>,
407    dc_pool:         Mutex<dc_pool::DcPool>,
408    update_tx:       mpsc::UnboundedSender<update::Update>,
409}
410
411/// The main Telegram client. Cheap to clone — internally Arc-wrapped.
412#[derive(Clone)]
413pub struct Client {
414    pub(crate) inner: Arc<ClientInner>,
415    _update_rx: Arc<Mutex<mpsc::UnboundedReceiver<update::Update>>>,
416}
417
418impl Client {
419    // ── Connect ────────────────────────────────────────────────────────────
420
421    pub async fn connect(config: Config) -> Result<Self, InvocationError> {
422        let (update_tx, update_rx) = mpsc::unbounded_channel();
423
424        // ── Load or fresh-connect ───────────────────────────────────────
425        let socks5    = config.socks5.clone();
426        let transport = config.transport.clone();
427
428        let (conn, home_dc_id, dc_opts) =
429            match config.session_backend.load()
430                .map_err(InvocationError::Io)?
431            {
432                Some(s) => {
433                    if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
434                        if let Some(key) = dc.auth_key {
435                            log::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
436                            match Connection::connect_with_key(
437                                &dc.addr, key, dc.first_salt, dc.time_offset,
438                                socks5.as_ref(), &transport,
439                            ).await {
440                                Ok(c) => {
441                                    let mut opts = session::default_dc_addresses()
442                                        .into_iter()
443                                        .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
444                                        .collect::<HashMap<_, _>>();
445                                    for d in &s.dcs { opts.insert(d.dc_id, d.clone()); }
446                                    (c, s.home_dc_id, opts)
447                                }
448                                Err(e) => {
449                                    log::warn!("[layer] Session connect failed ({e}), fresh connect …");
450                                    Self::fresh_connect(socks5.as_ref(), &transport).await?
451                                }
452                            }
453                        } else {
454                            Self::fresh_connect(socks5.as_ref(), &transport).await?
455                        }
456                    } else {
457                        Self::fresh_connect(socks5.as_ref(), &transport).await?
458                    }
459                }
460                None => Self::fresh_connect(socks5.as_ref(), &transport).await?,
461            };
462
463        // ── Build DC pool ───────────────────────────────────────────────
464        let pool = dc_pool::DcPool::new(home_dc_id, &dc_opts.values().cloned().collect::<Vec<_>>());
465
466        let inner = Arc::new(ClientInner {
467            conn:            Mutex::new(conn),
468            home_dc_id:      Mutex::new(home_dc_id),
469            dc_options:      Mutex::new(dc_opts),
470            peer_cache:      Mutex::new(PeerCache::default()),
471            pts_state:       Mutex::new(pts::PtsState::default()),
472            api_id:          config.api_id,
473            api_hash:        config.api_hash,
474            retry_policy:    config.retry_policy,
475            socks5:          config.socks5,
476            allow_ipv6:      config.allow_ipv6,
477            transport:       config.transport,
478            session_backend: config.session_backend,
479            dc_pool:         Mutex::new(pool),
480            update_tx:       update_tx,
481        });
482
483        let client = Self {
484            inner,
485            _update_rx: Arc::new(Mutex::new(update_rx)),
486        };
487
488        // If init_connection fails (e.g. stale auth key rejected by Telegram),
489        // drop the saved connection, do a fresh DH handshake, and retry once.
490        // This mirrors what grammers does: on a 404/bad-auth response it reconnects
491        // with a brand-new auth key rather than giving up or hanging.
492        if let Err(e) = client.init_connection().await {
493            log::warn!("[layer] init_connection failed ({e}), retrying with fresh connect …");
494
495            let socks5_r    = client.inner.socks5.clone();
496            let transport_r = client.inner.transport.clone();
497            let (new_conn, new_dc_id, new_opts) =
498                Self::fresh_connect(socks5_r.as_ref(), &transport_r).await?;
499
500            {
501                let mut conn_guard = client.inner.conn.lock().await;
502                *conn_guard = new_conn;
503            }
504            {
505                let mut dc_guard = client.inner.home_dc_id.lock().await;
506                *dc_guard = new_dc_id;
507            }
508            {
509                let mut opts_guard = client.inner.dc_options.lock().await;
510                *opts_guard = new_opts;
511            }
512
513            client.init_connection().await?;
514        }
515
516        let _ = client.sync_pts_state().await;
517        Ok(client)
518    }
519
520    async fn fresh_connect(
521        socks5:    Option<&crate::socks5::Socks5Config>,
522        transport: &TransportKind,
523    ) -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
524        log::info!("[layer] Fresh connect to DC2 …");
525        let conn = Connection::connect_raw("149.154.167.51:443", socks5, transport).await?;
526        let opts = session::default_dc_addresses()
527            .into_iter()
528            .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
529            .collect();
530        Ok((conn, 2, opts))
531    }
532
533    // ── Session ────────────────────────────────────────────────────────────
534
535    pub async fn save_session(&self) -> Result<(), InvocationError> {
536        let conn_guard = self.inner.conn.lock().await;
537        let home_dc_id = *self.inner.home_dc_id.lock().await;
538        let dc_options = self.inner.dc_options.lock().await;
539
540        let mut dcs: Vec<DcEntry> = dc_options.values().map(|e| DcEntry {
541            dc_id:       e.dc_id,
542            addr:        e.addr.clone(),
543            auth_key:    if e.dc_id == home_dc_id { Some(conn_guard.auth_key_bytes()) } else { e.auth_key },
544            first_salt:  if e.dc_id == home_dc_id { conn_guard.first_salt() } else { e.first_salt },
545            time_offset: if e.dc_id == home_dc_id { conn_guard.time_offset() } else { e.time_offset },
546        }).collect();
547        // Collect auth keys from worker DCs in the pool
548        self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
549
550        self.inner.session_backend
551            .save(&PersistedSession { home_dc_id, dcs })
552            .map_err(InvocationError::Io)?;
553        log::info!("[layer] Session saved ✓");
554        Ok(())
555    }
556
557    // ── Auth ───────────────────────────────────────────────────────────────
558
559    /// Returns `true` if the client is already authorized.
560    pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
561        match self.invoke(&tl::functions::updates::GetState {}).await {
562            Ok(_)  => Ok(true),
563            Err(e) if e.is("AUTH_KEY_UNREGISTERED")
564                   || matches!(&e, InvocationError::Rpc(r) if r.code == 401) => Ok(false),
565            Err(e) => Err(e),
566        }
567    }
568
569    /// Sign in as a bot.
570    pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
571        let req = tl::functions::auth::ImportBotAuthorization {
572            flags: 0, api_id: self.inner.api_id,
573            api_hash: self.inner.api_hash.clone(),
574            bot_auth_token: token.to_string(),
575        };
576
577        let result = match self.invoke(&req).await {
578            Ok(x) => x,
579            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
580                let dc_id = r.value.unwrap_or(2) as i32;
581                self.migrate_to(dc_id).await?;
582                self.invoke(&req).await?
583            }
584            Err(e) => return Err(e),
585        };
586
587        let name = match result {
588            tl::enums::auth::Authorization::Authorization(a) => {
589                self.cache_user(&a.user).await;
590                Self::extract_user_name(&a.user)
591            }
592            tl::enums::auth::Authorization::SignUpRequired(_) => {
593                panic!("unexpected SignUpRequired during bot sign-in")
594            }
595        };
596        log::info!("[layer] Bot signed in ✓  ({name})");
597        Ok(name)
598    }
599
600    /// Request a login code for a user account.
601    pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
602        use tl::enums::auth::SentCode;
603
604        let req = self.make_send_code_req(phone);
605        let body = match self.rpc_call_raw(&req).await {
606            Ok(b) => b,
607            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
608                let dc_id = r.value.unwrap_or(2) as i32;
609                self.migrate_to(dc_id).await?;
610                self.rpc_call_raw(&req).await?
611            }
612            Err(e) => return Err(e),
613        };
614
615        let mut cur = Cursor::from_slice(&body);
616        let hash = match tl::enums::auth::SentCode::deserialize(&mut cur)? {
617            SentCode::SentCode(s) => s.phone_code_hash,
618            SentCode::Success(_)  => return Err(InvocationError::Deserialize("unexpected Success".into())),
619            SentCode::PaymentRequired(_) => return Err(InvocationError::Deserialize("payment required to send code".into())),
620        };
621        log::info!("[layer] Login code sent");
622        Ok(LoginToken { phone: phone.to_string(), phone_code_hash: hash })
623    }
624
625    /// Complete sign-in with the code sent to the phone.
626    pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
627        let req = tl::functions::auth::SignIn {
628            phone_number:    token.phone.clone(),
629            phone_code_hash: token.phone_code_hash.clone(),
630            phone_code:      Some(code.trim().to_string()),
631            email_verification: None,
632        };
633
634        let body = match self.rpc_call_raw(&req).await {
635            Ok(b) => b,
636            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
637                let dc_id = r.value.unwrap_or(2) as i32;
638                self.migrate_to(dc_id).await.map_err(SignInError::Other)?;
639                self.rpc_call_raw(&req).await.map_err(SignInError::Other)?
640            }
641            Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
642                let t = self.get_password_info().await.map_err(SignInError::Other)?;
643                return Err(SignInError::PasswordRequired(t));
644            }
645            Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
646            Err(e) => return Err(SignInError::Other(e)),
647        };
648
649        let mut cur = Cursor::from_slice(&body);
650        match tl::enums::auth::Authorization::deserialize(&mut cur)
651            .map_err(|e| SignInError::Other(e.into()))?
652        {
653            tl::enums::auth::Authorization::Authorization(a) => {
654                self.cache_user(&a.user).await;
655                let name = Self::extract_user_name(&a.user);
656                log::info!("[layer] Signed in ✓  Welcome, {name}!");
657                Ok(name)
658            }
659            tl::enums::auth::Authorization::SignUpRequired(_) => Err(SignInError::SignUpRequired),
660        }
661    }
662
663    /// Complete 2FA login.
664    pub async fn check_password(
665        &self,
666        token:    PasswordToken,
667        password: impl AsRef<[u8]>,
668    ) -> Result<String, InvocationError> {
669        let pw   = token.password;
670        let algo = pw.current_algo.ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
671        let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
672        let g_b  = pw.srp_b.ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
673        let a    = pw.secure_random;
674        let srp_id = pw.srp_id.ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
675
676        let (m1, g_a) = two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
677        let req = tl::functions::auth::CheckPassword {
678            password: tl::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
679                tl::types::InputCheckPasswordSrp {
680                    srp_id, a: g_a.to_vec(), m1: m1.to_vec(),
681                },
682            ),
683        };
684
685        let body = self.rpc_call_raw(&req).await?;
686        let mut cur = Cursor::from_slice(&body);
687        match tl::enums::auth::Authorization::deserialize(&mut cur)? {
688            tl::enums::auth::Authorization::Authorization(a) => {
689                self.cache_user(&a.user).await;
690                let name = Self::extract_user_name(&a.user);
691                log::info!("[layer] 2FA ✓  Welcome, {name}!");
692                Ok(name)
693            }
694            tl::enums::auth::Authorization::SignUpRequired(_) =>
695                Err(InvocationError::Deserialize("unexpected SignUpRequired after 2FA".into())),
696        }
697    }
698
699    /// Sign out and invalidate the current session.
700    pub async fn sign_out(&self) -> Result<bool, InvocationError> {
701        let req = tl::functions::auth::LogOut {};
702        match self.rpc_call_raw(&req).await {
703            Ok(_) => { log::info!("[layer] Signed out ✓"); Ok(true) }
704            Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
705            Err(e) => Err(e),
706        }
707    }
708
709    // ── Get self ───────────────────────────────────────────────────────────
710
711    /// Fetch information about the logged-in user.
712    pub async fn get_me(&self) -> Result<tl::types::User, InvocationError> {
713        let req = tl::functions::users::GetUsers {
714            id: vec![tl::enums::InputUser::UserSelf],
715        };
716        let body    = self.rpc_call_raw(&req).await?;
717        let mut cur = Cursor::from_slice(&body);
718        let users   = Vec::<tl::enums::User>::deserialize(&mut cur)?;
719        self.cache_users_slice(&users).await;
720        users.into_iter().find_map(|u| match u {
721            tl::enums::User::User(u) => Some(u),
722            _ => None,
723        }).ok_or_else(|| InvocationError::Deserialize("getUsers returned no user".into()))
724    }
725
726    // ── Updates ────────────────────────────────────────────────────────────
727
728    /// Return an [`UpdateStream`] that yields incoming [`Update`]s.
729    pub fn stream_updates(&self) -> UpdateStream {
730        let (tx, rx) = mpsc::unbounded_channel();
731        let client = self.clone();
732        tokio::spawn(async move {
733            client.run_update_loop(tx).await;
734        });
735        UpdateStream { rx }
736    }
737
738    async fn run_update_loop(&self, tx: mpsc::UnboundedSender<update::Update>) {
739        loop {
740            let result = {
741                let mut conn = self.inner.conn.lock().await;
742                match tokio::time::timeout(Duration::from_secs(30), conn.recv_once()).await {
743                    Ok(Ok(updates)) => Ok(updates),
744                    Ok(Err(e))      => Err(e),
745                    Err(_timeout)   => {
746                        let _ = conn.send_ping().await;
747                        continue;
748                    }
749                }
750            };
751
752            match result {
753                Ok(updates) => {
754                    for u in updates { let _ = tx.send(u); }
755                }
756                Err(e) => {
757                    log::warn!("[layer] Update loop error: {e} — reconnecting …");
758                    sleep(Duration::from_secs(1)).await;
759                    let home_dc_id = *self.inner.home_dc_id.lock().await;
760                    let (addr, saved_key, first_salt, time_offset) = {
761                        let opts = self.inner.dc_options.lock().await;
762                        match opts.get(&home_dc_id) {
763                            Some(e) => (e.addr.clone(), e.auth_key, e.first_salt, e.time_offset),
764                            None    => ("149.154.167.51:443".to_string(), None, 0, 0),
765                        }
766                    };
767                    let socks5    = self.inner.socks5.clone();
768                    let transport = self.inner.transport.clone();
769
770                    // Prefer reconnecting with the existing auth key (user is already
771                    // authorised on it).  Only fall back to a fresh DH if that fails.
772                    let new_conn_result = if let Some(key) = saved_key {
773                        log::info!("[layer] Reconnecting to DC{home_dc_id} with saved key …");
774                        match Connection::connect_with_key(
775                            &addr, key, first_salt, time_offset,
776                            socks5.as_ref(), &transport,
777                        ).await {
778                            Ok(c)  => Ok(c),
779                            Err(e2) => {
780                                log::warn!("[layer] connect_with_key failed ({e2}), falling back to fresh DH …");
781                                Connection::connect_raw(&addr, socks5.as_ref(), &transport).await
782                            }
783                        }
784                    } else {
785                        Connection::connect_raw(&addr, socks5.as_ref(), &transport).await
786                    };
787
788                    match new_conn_result {
789                        Ok(new_conn) => {
790                            *self.inner.conn.lock().await = new_conn;
791                            if let Err(e2) = self.init_connection().await {
792                                log::warn!("[layer] init_connection after reconnect failed: {e2}");
793                            }
794                            // Fetch any updates missed during disconnect
795                            match self.get_difference().await {
796                                Ok(missed) => {
797                                    for u in missed { let _ = tx.send(u); }
798                                }
799                                Err(e2) => log::warn!("[layer] getDifference after reconnect failed: {e2}"),
800                            }
801                        }
802                        Err(e2) => {
803                            log::error!("[layer] Reconnect failed: {e2}");
804                            break;
805                        }
806                    }
807                }
808            }
809        }
810    }
811
812    // ── Messaging ──────────────────────────────────────────────────────────
813
814    /// Send a text message. Use `"me"` for Saved Messages.
815    pub async fn send_message(&self, peer: &str, text: &str) -> Result<(), InvocationError> {
816        let p = self.resolve_peer(peer).await?;
817        self.send_message_to_peer(p, text).await
818    }
819
820    /// Send a message to an already-resolved peer (plain text shorthand).
821    pub async fn send_message_to_peer(
822        &self,
823        peer: tl::enums::Peer,
824        text: &str,
825    ) -> Result<(), InvocationError> {
826        self.send_message_to_peer_ex(peer, &InputMessage::text(text)).await
827    }
828
829    /// Send a message with full [`InputMessage`] options.
830    pub async fn send_message_to_peer_ex(
831        &self,
832        peer: tl::enums::Peer,
833        msg:  &InputMessage,
834    ) -> Result<(), InvocationError> {
835        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
836        let req = tl::functions::messages::SendMessage {
837            no_webpage:               msg.no_webpage,
838            silent:                   msg.silent,
839            background:               msg.background,
840            clear_draft:              msg.clear_draft,
841            noforwards:               false,
842            update_stickersets_order: false,
843            invert_media:             false,
844            allow_paid_floodskip:     false,
845            peer:                     input_peer,
846            reply_to:                 msg.reply_header(),
847            message:                  msg.text.clone(),
848            random_id:                random_i64(),
849            reply_markup:             msg.reply_markup.clone(),
850            entities:                 msg.entities.clone(),
851            schedule_date:            msg.schedule_date,
852            schedule_repeat_period:   None,
853            send_as:                  None,
854            quick_reply_shortcut:     None,
855            effect:                   None,
856            allow_paid_stars:         None,
857            suggested_post:           None,
858        };
859        self.rpc_write(&req).await
860    }
861
862    /// Send directly to Saved Messages.
863    pub async fn send_to_self(&self, text: &str) -> Result<(), InvocationError> {
864        let req = tl::functions::messages::SendMessage {
865            no_webpage:               false,
866            silent:                   false,
867            background:               false,
868            clear_draft:              false,
869            noforwards:               false,
870            update_stickersets_order: false,
871            invert_media:             false,
872            allow_paid_floodskip:     false,
873            peer:                     tl::enums::InputPeer::PeerSelf,
874            reply_to:                 None,
875            message:                  text.to_string(),
876            random_id:                random_i64(),
877            reply_markup:             None,
878            entities:                 None,
879            schedule_date:            None,
880            schedule_repeat_period:   None,
881            send_as:                  None,
882            quick_reply_shortcut:     None,
883            effect:                   None,
884            allow_paid_stars:         None,
885            suggested_post:           None,
886        };
887        self.rpc_write(&req).await
888    }
889
890    /// Edit an existing message.
891    pub async fn edit_message(
892        &self,
893        peer:       tl::enums::Peer,
894        message_id: i32,
895        new_text:   &str,
896    ) -> Result<(), InvocationError> {
897        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
898        let req = tl::functions::messages::EditMessage {
899            no_webpage:    false,
900            invert_media:  false,
901            peer:          input_peer,
902            id:            message_id,
903            message:       Some(new_text.to_string()),
904            media:         None,
905            reply_markup:  None,
906            entities:      None,
907            schedule_date: None,
908            schedule_repeat_period: None,
909            quick_reply_shortcut_id: None,
910        };
911        self.rpc_write(&req).await
912    }
913
914    /// Forward messages from `source` to `destination`.
915    pub async fn forward_messages(
916        &self,
917        destination: tl::enums::Peer,
918        message_ids: &[i32],
919        source:      tl::enums::Peer,
920    ) -> Result<(), InvocationError> {
921        let cache = self.inner.peer_cache.lock().await;
922        let to_peer   = cache.peer_to_input(&destination);
923        let from_peer = cache.peer_to_input(&source);
924        drop(cache);
925
926        let req = tl::functions::messages::ForwardMessages {
927            silent:            false,
928            background:        false,
929            with_my_score:     false,
930            drop_author:       false,
931            drop_media_captions: false,
932            noforwards:        false,
933            from_peer:         from_peer,
934            id:                message_ids.to_vec(),
935            random_id:         (0..message_ids.len()).map(|_| random_i64()).collect(),
936            to_peer:           to_peer,
937            top_msg_id:        None,
938            reply_to:          None,
939            schedule_date:     None,
940            schedule_repeat_period: None,
941            send_as:           None,
942            quick_reply_shortcut: None,
943            effect:            None,
944            video_timestamp:   None,
945            allow_paid_stars:  None,
946            allow_paid_floodskip: false,
947            suggested_post:    None,
948        };
949        self.rpc_write(&req).await
950    }
951
952    /// Delete messages by ID.
953    pub async fn delete_messages(&self, message_ids: Vec<i32>, revoke: bool) -> Result<(), InvocationError> {
954        let req = tl::functions::messages::DeleteMessages { revoke, id: message_ids };
955        self.rpc_write(&req).await
956    }
957
958    /// Get messages by their IDs from a peer.
959    pub async fn get_messages_by_id(
960        &self,
961        peer: tl::enums::Peer,
962        ids:  &[i32],
963    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
964        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
965        let id_list: Vec<tl::enums::InputMessage> = ids.iter()
966            .map(|&id| tl::enums::InputMessage::Id(tl::types::InputMessageId { id }))
967            .collect();
968        let req  = tl::functions::channels::GetMessages {
969            channel: match &input_peer {
970                tl::enums::InputPeer::Channel(c) =>
971                    tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
972                        channel_id: c.channel_id, access_hash: c.access_hash
973                    }),
974                _ => return self.get_messages_user(input_peer, id_list).await,
975            },
976            id: id_list,
977        };
978        let body    = self.rpc_call_raw(&req).await?;
979        let mut cur = Cursor::from_slice(&body);
980        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
981            tl::enums::messages::Messages::Messages(m) => m.messages,
982            tl::enums::messages::Messages::Slice(m)    => m.messages,
983            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
984            tl::enums::messages::Messages::NotModified(_) => vec![],
985        };
986        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
987    }
988
989    async fn get_messages_user(
990        &self,
991        _peer: tl::enums::InputPeer,
992        ids:   Vec<tl::enums::InputMessage>,
993    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
994        let req = tl::functions::messages::GetMessages { id: ids };
995        let body    = self.rpc_call_raw(&req).await?;
996        let mut cur = Cursor::from_slice(&body);
997        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
998            tl::enums::messages::Messages::Messages(m) => m.messages,
999            tl::enums::messages::Messages::Slice(m)    => m.messages,
1000            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1001            tl::enums::messages::Messages::NotModified(_) => vec![],
1002        };
1003        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1004    }
1005
1006    /// Get the pinned message in a chat.
1007    pub async fn get_pinned_message(
1008        &self,
1009        peer: tl::enums::Peer,
1010    ) -> Result<Option<update::IncomingMessage>, InvocationError> {
1011        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1012        let req = tl::functions::messages::Search {
1013            peer: input_peer,
1014            q: String::new(),
1015            from_id: None,
1016            saved_peer_id: None,
1017            saved_reaction: None,
1018            top_msg_id: None,
1019            filter: tl::enums::MessagesFilter::InputMessagesFilterPinned,
1020            min_date: 0,
1021            max_date: 0,
1022            offset_id: 0,
1023            add_offset: 0,
1024            limit: 1,
1025            max_id: 0,
1026            min_id: 0,
1027            hash: 0,
1028        };
1029        let body    = self.rpc_call_raw(&req).await?;
1030        let mut cur = Cursor::from_slice(&body);
1031        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1032            tl::enums::messages::Messages::Messages(m) => m.messages,
1033            tl::enums::messages::Messages::Slice(m)    => m.messages,
1034            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1035            tl::enums::messages::Messages::NotModified(_) => vec![],
1036        };
1037        Ok(msgs.into_iter().next().map(update::IncomingMessage::from_raw))
1038    }
1039
1040    /// Pin a message in a chat.
1041    pub async fn pin_message(
1042        &self,
1043        peer:       tl::enums::Peer,
1044        message_id: i32,
1045        silent:     bool,
1046        unpin:      bool,
1047        pm_oneside: bool,
1048    ) -> Result<(), InvocationError> {
1049        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1050        let req = tl::functions::messages::UpdatePinnedMessage {
1051            silent,
1052            unpin,
1053            pm_oneside,
1054            peer: input_peer,
1055            id:   message_id,
1056        };
1057        self.rpc_write(&req).await
1058    }
1059
1060    /// Unpin a specific message.
1061    pub async fn unpin_message(
1062        &self,
1063        peer:       tl::enums::Peer,
1064        message_id: i32,
1065    ) -> Result<(), InvocationError> {
1066        self.pin_message(peer, message_id, true, true, false).await
1067    }
1068
1069    /// Unpin all messages in a chat.
1070    pub async fn unpin_all_messages(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1071        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1072        let req = tl::functions::messages::UnpinAllMessages {
1073            peer:      input_peer,
1074            top_msg_id: None,
1075            saved_peer_id: None,
1076        };
1077        self.rpc_write(&req).await
1078    }
1079
1080    // ── Message search ─────────────────────────────────────────────────────
1081
1082    /// Search messages in a chat.
1083    pub async fn search_messages(
1084        &self,
1085        peer:  tl::enums::Peer,
1086        query: &str,
1087        limit: i32,
1088    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1089        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1090        let req = tl::functions::messages::Search {
1091            peer:         input_peer,
1092            q:            query.to_string(),
1093            from_id:      None,
1094            saved_peer_id: None,
1095            saved_reaction: None,
1096            top_msg_id:   None,
1097            filter:       tl::enums::MessagesFilter::InputMessagesFilterEmpty,
1098            min_date:     0,
1099            max_date:     0,
1100            offset_id:    0,
1101            add_offset:   0,
1102            limit,
1103            max_id:       0,
1104            min_id:       0,
1105            hash:         0,
1106        };
1107        let body    = self.rpc_call_raw(&req).await?;
1108        let mut cur = Cursor::from_slice(&body);
1109        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1110            tl::enums::messages::Messages::Messages(m) => m.messages,
1111            tl::enums::messages::Messages::Slice(m)    => m.messages,
1112            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1113            tl::enums::messages::Messages::NotModified(_) => vec![],
1114        };
1115        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1116    }
1117
1118    /// Search messages globally across all chats.
1119    pub async fn search_global(
1120        &self,
1121        query: &str,
1122        limit: i32,
1123    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1124        let req = tl::functions::messages::SearchGlobal {
1125            broadcasts_only: false,
1126            groups_only:     false,
1127            users_only:      false,
1128            folder_id:       None,
1129            q:               query.to_string(),
1130            filter:          tl::enums::MessagesFilter::InputMessagesFilterEmpty,
1131            min_date:        0,
1132            max_date:        0,
1133            offset_rate:     0,
1134            offset_peer:     tl::enums::InputPeer::Empty,
1135            offset_id:       0,
1136            limit,
1137        };
1138        let body    = self.rpc_call_raw(&req).await?;
1139        let mut cur = Cursor::from_slice(&body);
1140        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1141            tl::enums::messages::Messages::Messages(m) => m.messages,
1142            tl::enums::messages::Messages::Slice(m)    => m.messages,
1143            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1144            tl::enums::messages::Messages::NotModified(_) => vec![],
1145        };
1146        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1147    }
1148
1149    // ── Scheduled messages ─────────────────────────────────────────────────
1150
1151    /// Retrieve all scheduled messages in a chat.
1152    ///
1153    /// Scheduled messages are messages set to be sent at a future time using
1154    /// [`InputMessage::schedule_date`].  Returns them newest-first.
1155    ///
1156    /// # Example
1157    /// ```rust,no_run
1158    /// # async fn f(client: layer_client::Client, peer: layer_tl_types::enums::Peer) -> Result<(), Box<dyn std::error::Error>> {
1159    /// let scheduled = client.get_scheduled_messages(peer).await?;
1160    /// for msg in &scheduled {
1161    ///     println!("Scheduled: {:?} at {:?}", msg.text(), msg.date());
1162    /// }
1163    /// # Ok(()) }
1164    /// ```
1165    pub async fn get_scheduled_messages(
1166        &self,
1167        peer: tl::enums::Peer,
1168    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1169        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1170        let req = tl::functions::messages::GetScheduledHistory {
1171            peer: input_peer,
1172            hash: 0,
1173        };
1174        let body    = self.rpc_call_raw(&req).await?;
1175        let mut cur = Cursor::from_slice(&body);
1176        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1177            tl::enums::messages::Messages::Messages(m)        => m.messages,
1178            tl::enums::messages::Messages::Slice(m)           => m.messages,
1179            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1180            tl::enums::messages::Messages::NotModified(_)     => vec![],
1181        };
1182        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1183    }
1184
1185    /// Delete one or more scheduled messages by their IDs.
1186    pub async fn delete_scheduled_messages(
1187        &self,
1188        peer: tl::enums::Peer,
1189        ids:  Vec<i32>,
1190    ) -> Result<(), InvocationError> {
1191        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1192        let req = tl::functions::messages::DeleteScheduledMessages {
1193            peer: input_peer,
1194            id:   ids,
1195        };
1196        self.rpc_write(&req).await
1197    }
1198
1199    // ── Callback / Inline Queries ──────────────────────────────────────────
1200
1201    pub async fn answer_callback_query(
1202        &self,
1203        query_id: i64,
1204        text:     Option<&str>,
1205        alert:    bool,
1206    ) -> Result<bool, InvocationError> {
1207        let req = tl::functions::messages::SetBotCallbackAnswer {
1208            alert,
1209            query_id,
1210            message:    text.map(|s| s.to_string()),
1211            url:        None,
1212            cache_time: 0,
1213        };
1214        let body = self.rpc_call_raw(&req).await?;
1215        Ok(!body.is_empty())
1216    }
1217
1218    pub async fn answer_inline_query(
1219        &self,
1220        query_id:    i64,
1221        results:     Vec<tl::enums::InputBotInlineResult>,
1222        cache_time:  i32,
1223        is_personal: bool,
1224        next_offset: Option<String>,
1225    ) -> Result<bool, InvocationError> {
1226        let req = tl::functions::messages::SetInlineBotResults {
1227            gallery:        false,
1228            private:        is_personal,
1229            query_id,
1230            results,
1231            cache_time,
1232            next_offset,
1233            switch_pm:      None,
1234            switch_webview: None,
1235        };
1236        let body = self.rpc_call_raw(&req).await?;
1237        Ok(!body.is_empty())
1238    }
1239
1240    // ── Dialogs ────────────────────────────────────────────────────────────
1241
1242    /// Fetch up to `limit` dialogs, most recent first. Populates entity/message.
1243    pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
1244        let req = tl::functions::messages::GetDialogs {
1245            exclude_pinned: false,
1246            folder_id:      None,
1247            offset_date:    0,
1248            offset_id:      0,
1249            offset_peer:    tl::enums::InputPeer::Empty,
1250            limit,
1251            hash:           0,
1252        };
1253
1254        let body    = self.rpc_call_raw(&req).await?;
1255        let mut cur = Cursor::from_slice(&body);
1256        let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
1257            tl::enums::messages::Dialogs::Dialogs(d) => d,
1258            tl::enums::messages::Dialogs::Slice(d)   => tl::types::messages::Dialogs {
1259                dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
1260            },
1261            tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
1262        };
1263
1264        // Build message map
1265        let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
1266            .filter_map(|m| {
1267                let id = match &m {
1268                    tl::enums::Message::Message(x) => x.id,
1269                    tl::enums::Message::Service(x) => x.id,
1270                    tl::enums::Message::Empty(x)   => x.id,
1271                };
1272                Some((id, m))
1273            })
1274            .collect();
1275
1276        // Build user map
1277        let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
1278            .filter_map(|u| {
1279                if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
1280            })
1281            .collect();
1282
1283        // Build chat map
1284        let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
1285            .filter_map(|c| {
1286                let id = match &c {
1287                    tl::enums::Chat::Chat(x)             => x.id,
1288                    tl::enums::Chat::Forbidden(x)    => x.id,
1289                    tl::enums::Chat::Channel(x)          => x.id,
1290                    tl::enums::Chat::ChannelForbidden(x) => x.id,
1291                    tl::enums::Chat::Empty(x)            => x.id,
1292                };
1293                Some((id, c))
1294            })
1295            .collect();
1296
1297        // Cache peers for future access_hash lookups
1298        {
1299            let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
1300            let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
1301            self.cache_users_slice(&u_list).await;
1302            self.cache_chats_slice(&c_list).await;
1303        }
1304
1305        let result = raw.dialogs.into_iter().map(|d| {
1306            let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
1307            let peer   = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
1308
1309            let message = msg_map.get(&top_id).cloned();
1310            let entity = peer.and_then(|p| match p {
1311                tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
1312                _ => None,
1313            });
1314            let chat = peer.and_then(|p| match p {
1315                tl::enums::Peer::Chat(c)    => chat_map.get(&c.chat_id).cloned(),
1316                tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
1317                _ => None,
1318            });
1319
1320            Dialog { raw: d, message, entity, chat }
1321        }).collect();
1322
1323        Ok(result)
1324    }
1325
1326    /// Internal helper: fetch dialogs with a custom GetDialogs request.
1327    async fn get_dialogs_raw(
1328        &self,
1329        req: tl::functions::messages::GetDialogs,
1330    ) -> Result<Vec<Dialog>, InvocationError> {
1331        let body    = self.rpc_call_raw(&req).await?;
1332        let mut cur = Cursor::from_slice(&body);
1333        let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
1334            tl::enums::messages::Dialogs::Dialogs(d) => d,
1335            tl::enums::messages::Dialogs::Slice(d)   => tl::types::messages::Dialogs {
1336                dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
1337            },
1338            tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
1339        };
1340
1341        let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
1342            .filter_map(|m| {
1343                let id = match &m {
1344                    tl::enums::Message::Message(x) => x.id,
1345                    tl::enums::Message::Service(x) => x.id,
1346                    tl::enums::Message::Empty(x)   => x.id,
1347                };
1348                Some((id, m))
1349            })
1350            .collect();
1351
1352        let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
1353            .filter_map(|u| {
1354                if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
1355            })
1356            .collect();
1357
1358        let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
1359            .filter_map(|c| {
1360                let id = match &c {
1361                    tl::enums::Chat::Chat(x)             => x.id,
1362                    tl::enums::Chat::Forbidden(x)    => x.id,
1363                    tl::enums::Chat::Channel(x)          => x.id,
1364                    tl::enums::Chat::ChannelForbidden(x) => x.id,
1365                    tl::enums::Chat::Empty(x)            => x.id,
1366                };
1367                Some((id, c))
1368            })
1369            .collect();
1370
1371        {
1372            let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
1373            let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
1374            self.cache_users_slice(&u_list).await;
1375            self.cache_chats_slice(&c_list).await;
1376        }
1377
1378        let result = raw.dialogs.into_iter().map(|d| {
1379            let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
1380            let peer   = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
1381
1382            let message = msg_map.get(&top_id).cloned();
1383            let entity = peer.and_then(|p| match p {
1384                tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
1385                _ => None,
1386            });
1387            let chat = peer.and_then(|p| match p {
1388                tl::enums::Peer::Chat(c)    => chat_map.get(&c.chat_id).cloned(),
1389                tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
1390                _ => None,
1391            });
1392
1393            Dialog { raw: d, message, entity, chat }
1394        }).collect();
1395
1396        Ok(result)
1397    }
1398    pub async fn delete_dialog(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1399        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1400        let req = tl::functions::messages::DeleteHistory {
1401            just_clear:  false,
1402            revoke:      false,
1403            peer:        input_peer,
1404            max_id:      0,
1405            min_date:    None,
1406            max_date:    None,
1407        };
1408        self.rpc_write(&req).await
1409    }
1410
1411    /// Mark all messages in a chat as read.
1412    pub async fn mark_as_read(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1413        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1414        match &input_peer {
1415            tl::enums::InputPeer::Channel(c) => {
1416                let req = tl::functions::channels::ReadHistory {
1417                    channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
1418                        channel_id: c.channel_id, access_hash: c.access_hash,
1419                    }),
1420                    max_id: 0,
1421                };
1422                self.rpc_call_raw(&req).await?;
1423            }
1424            _ => {
1425                let req = tl::functions::messages::ReadHistory { peer: input_peer, max_id: 0 };
1426                self.rpc_call_raw(&req).await?;
1427            }
1428        }
1429        Ok(())
1430    }
1431
1432    /// Clear unread mention markers.
1433    pub async fn clear_mentions(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1434        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1435        let req = tl::functions::messages::ReadMentions { peer: input_peer, top_msg_id: None };
1436        self.rpc_write(&req).await
1437    }
1438
1439    // ── Chat actions (typing, etc) ─────────────────────────────────────────
1440
1441    /// Send a chat action (typing indicator, uploading photo, etc).
1442    ///
1443    /// For "typing" use `tl::enums::SendMessageAction::Typing`.
1444    pub async fn send_chat_action(
1445        &self,
1446        peer:   tl::enums::Peer,
1447        action: tl::enums::SendMessageAction,
1448    ) -> Result<(), InvocationError> {
1449        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1450        let req = tl::functions::messages::SetTyping {
1451            peer: input_peer,
1452            top_msg_id: None,
1453            action,
1454        };
1455        self.rpc_write(&req).await
1456    }
1457
1458    // ── Join / invite links ────────────────────────────────────────────────
1459
1460    /// Join a public chat or channel by username/peer.
1461    pub async fn join_chat(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1462        let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1463        match input_peer {
1464            tl::enums::InputPeer::Channel(c) => {
1465                let req = tl::functions::channels::JoinChannel {
1466                    channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
1467                        channel_id: c.channel_id, access_hash: c.access_hash,
1468                    }),
1469                };
1470                self.rpc_call_raw(&req).await?;
1471            }
1472            tl::enums::InputPeer::Chat(c) => {
1473                let req = tl::functions::messages::AddChatUser {
1474                    chat_id:  c.chat_id,
1475                    user_id:  tl::enums::InputUser::UserSelf,
1476                    fwd_limit: 0,
1477                };
1478                self.rpc_call_raw(&req).await?;
1479            }
1480            _ => return Err(InvocationError::Deserialize("cannot join this peer type".into())),
1481        }
1482        Ok(())
1483    }
1484
1485    /// Accept and join via an invite link.
1486    pub async fn accept_invite_link(&self, link: &str) -> Result<(), InvocationError> {
1487        let hash = Self::parse_invite_hash(link)
1488            .ok_or_else(|| InvocationError::Deserialize(format!("invalid invite link: {link}")))?;
1489        let req = tl::functions::messages::ImportChatInvite { hash: hash.to_string() };
1490        self.rpc_write(&req).await
1491    }
1492
1493    /// Extract hash from `https://t.me/+HASH` or `https://t.me/joinchat/HASH`.
1494    pub fn parse_invite_hash(link: &str) -> Option<&str> {
1495        if let Some(pos) = link.find("/+") {
1496            return Some(&link[pos + 2..]);
1497        }
1498        if let Some(pos) = link.find("/joinchat/") {
1499            return Some(&link[pos + 10..]);
1500        }
1501        None
1502    }
1503
1504    // ── Message history (paginated) ────────────────────────────────────────
1505
1506    /// Fetch a page of messages from a peer's history.
1507    pub async fn get_messages(
1508        &self,
1509        peer:      tl::enums::InputPeer,
1510        limit:     i32,
1511        offset_id: i32,
1512    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1513        let req = tl::functions::messages::GetHistory {
1514            peer, offset_id, offset_date: 0, add_offset: 0,
1515            limit, max_id: 0, min_id: 0, hash: 0,
1516        };
1517        let body    = self.rpc_call_raw(&req).await?;
1518        let mut cur = Cursor::from_slice(&body);
1519        let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1520            tl::enums::messages::Messages::Messages(m) => m.messages,
1521            tl::enums::messages::Messages::Slice(m)    => m.messages,
1522            tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1523            tl::enums::messages::Messages::NotModified(_) => vec![],
1524        };
1525        Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1526    }
1527
1528    // ── Peer resolution ────────────────────────────────────────────────────
1529
1530    /// Resolve a peer string to a [`tl::enums::Peer`].
1531    pub async fn resolve_peer(
1532        &self,
1533        peer: &str,
1534    ) -> Result<tl::enums::Peer, InvocationError> {
1535        match peer.trim() {
1536            "me" | "self" => Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 })),
1537            username if username.starts_with('@') => {
1538                self.resolve_username(&username[1..]).await
1539            }
1540            id_str => {
1541                if let Ok(id) = id_str.parse::<i64>() {
1542                    Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: id }))
1543                } else {
1544                    Err(InvocationError::Deserialize(format!("cannot resolve peer: {peer}")))
1545                }
1546            }
1547        }
1548    }
1549
1550    async fn resolve_username(&self, username: &str) -> Result<tl::enums::Peer, InvocationError> {
1551        let req  = tl::functions::contacts::ResolveUsername {
1552            username: username.to_string(), referer: None,
1553        };
1554        let body = self.rpc_call_raw(&req).await?;
1555        let mut cur = Cursor::from_slice(&body);
1556        let resolved = match tl::enums::contacts::ResolvedPeer::deserialize(&mut cur)? {
1557            tl::enums::contacts::ResolvedPeer::ResolvedPeer(r) => r,
1558        };
1559        // Cache users and chats from the resolution
1560        self.cache_users_slice(&resolved.users).await;
1561        self.cache_chats_slice(&resolved.chats).await;
1562        Ok(resolved.peer)
1563    }
1564
1565    // ── Raw invoke ─────────────────────────────────────────────────────────
1566
1567    /// Invoke any TL function directly, handling flood-wait retries.
1568    pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
1569        let body = self.rpc_call_raw(req).await?;
1570        let mut cur = Cursor::from_slice(&body);
1571        R::Return::deserialize(&mut cur).map_err(Into::into)
1572    }
1573
1574    async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
1575        let mut fail_count   = NonZeroU32::new(1).unwrap();
1576        let mut slept_so_far = Duration::default();
1577        loop {
1578            match self.do_rpc_call(req).await {
1579                Ok(body) => return Ok(body),
1580                Err(e) => {
1581                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
1582                    match self.inner.retry_policy.should_retry(&ctx) {
1583                        ControlFlow::Continue(delay) => {
1584                            sleep(delay).await;
1585                            slept_so_far += delay;
1586                            fail_count = fail_count.saturating_add(1);
1587                        }
1588                        ControlFlow::Break(()) => return Err(ctx.error),
1589                    }
1590                }
1591            }
1592        }
1593    }
1594
1595    async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
1596        let mut side_updates = Vec::new();
1597        let result = {
1598            let mut conn = self.inner.conn.lock().await;
1599            conn.rpc_call(req, &mut side_updates).await
1600        };
1601        for u in side_updates {
1602            let _ = self.inner.update_tx.send(u);
1603        }
1604        result
1605    }
1606
1607    /// Like `rpc_call_raw` but for write RPCs whose TL return type is `Updates`.
1608    /// Accepts either a normal payload or an `Updates` frame as success, so we
1609    /// don't hang when Telegram sends back an `updateShort` instead of a full result.
1610    async fn rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
1611        let mut fail_count   = NonZeroU32::new(1).unwrap();
1612        let mut slept_so_far = Duration::default();
1613        loop {
1614            let mut side_updates = Vec::new();
1615            let result = {
1616                let mut conn = self.inner.conn.lock().await;
1617                conn.rpc_call_ack(req, &mut side_updates).await
1618            };
1619            for u in side_updates {
1620                let _ = self.inner.update_tx.send(u);
1621            }
1622            match result {
1623                Ok(()) => return Ok(()),
1624                Err(e) => {
1625                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
1626                    match self.inner.retry_policy.should_retry(&ctx) {
1627                        ControlFlow::Continue(delay) => {
1628                            sleep(delay).await;
1629                            slept_so_far += delay;
1630                            fail_count = fail_count.saturating_add(1);
1631                        }
1632                        ControlFlow::Break(()) => return Err(ctx.error),
1633                    }
1634                }
1635            }
1636        }
1637    }
1638
1639    // ── initConnection ─────────────────────────────────────────────────────
1640
1641    async fn init_connection(&self) -> Result<(), InvocationError> {
1642        use tl::functions::{InvokeWithLayer, InitConnection, help::GetConfig};
1643        let req = InvokeWithLayer {
1644            layer: tl::LAYER,
1645            query: InitConnection {
1646                api_id:           self.inner.api_id,
1647                device_model:     "Linux".to_string(),
1648                system_version:   "1.0".to_string(),
1649                app_version:      env!("CARGO_PKG_VERSION").to_string(),
1650                system_lang_code: "en".to_string(),
1651                lang_pack:        "".to_string(),
1652                lang_code:        "en".to_string(),
1653                proxy:            None,
1654                params:           None,
1655                query:            GetConfig {},
1656            },
1657        };
1658
1659        let body = {
1660            let mut side_updates = Vec::new();
1661            let result = {
1662                let mut conn = self.inner.conn.lock().await;
1663                conn.rpc_call_serializable(&req, &mut side_updates).await
1664            };
1665            for u in side_updates {
1666                let _ = self.inner.update_tx.send(u);
1667            }
1668            result?
1669        };
1670
1671        let mut cur = Cursor::from_slice(&body);
1672        if let Ok(tl::enums::Config::Config(cfg)) = tl::enums::Config::deserialize(&mut cur) {
1673            let allow_ipv6 = self.inner.allow_ipv6;
1674            let mut opts = self.inner.dc_options.lock().await;
1675            for opt in &cfg.dc_options {
1676                let tl::enums::DcOption::DcOption(o) = opt;
1677                if o.media_only || o.cdn || o.tcpo_only { continue; }
1678                if o.ipv6 && !allow_ipv6 { continue; }
1679                let addr = format!("{}:{}", o.ip_address, o.port);
1680                let entry = opts.entry(o.id).or_insert_with(|| DcEntry {
1681                    dc_id: o.id, addr: addr.clone(),
1682                    auth_key: None, first_salt: 0, time_offset: 0,
1683                });
1684                entry.addr = addr;
1685            }
1686            log::info!("[layer] initConnection ✓  ({} DCs, ipv6={})", cfg.dc_options.len(), allow_ipv6);
1687        }
1688        Ok(())
1689    }
1690
1691    // ── DC migration ───────────────────────────────────────────────────────
1692
1693    async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
1694        let addr = {
1695            let opts = self.inner.dc_options.lock().await;
1696            opts.get(&new_dc_id).map(|e| e.addr.clone())
1697                .unwrap_or_else(|| "149.154.167.51:443".to_string())
1698        };
1699        log::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
1700
1701        let saved_key = {
1702            let opts = self.inner.dc_options.lock().await;
1703            opts.get(&new_dc_id).and_then(|e| e.auth_key)
1704        };
1705
1706        let socks5    = self.inner.socks5.clone();
1707        let transport = self.inner.transport.clone();
1708        let conn = if let Some(key) = saved_key {
1709            Connection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
1710        } else {
1711            Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
1712        };
1713
1714        let new_key = conn.auth_key_bytes();
1715        {
1716            let mut opts = self.inner.dc_options.lock().await;
1717            let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
1718                dc_id: new_dc_id, addr: addr.clone(),
1719                auth_key: None, first_salt: 0, time_offset: 0,
1720            });
1721            entry.auth_key = Some(new_key);
1722        }
1723
1724        *self.inner.conn.lock().await = conn;
1725        *self.inner.home_dc_id.lock().await = new_dc_id;
1726        self.init_connection().await?;
1727        log::info!("[layer] Now on DC{new_dc_id} ✓");
1728        Ok(())
1729    }
1730
1731    // ── Cache helpers ──────────────────────────────────────────────────────
1732
1733    async fn cache_user(&self, user: &tl::enums::User) {
1734        self.inner.peer_cache.lock().await.cache_user(user);
1735    }
1736
1737    async fn cache_users_slice(&self, users: &[tl::enums::User]) {
1738        let mut cache = self.inner.peer_cache.lock().await;
1739        cache.cache_users(users);
1740    }
1741
1742    async fn cache_chats_slice(&self, chats: &[tl::enums::Chat]) {
1743        let mut cache = self.inner.peer_cache.lock().await;
1744        cache.cache_chats(chats);
1745    }
1746
1747    // Public versions used by sub-modules (media.rs, participants.rs, pts.rs)
1748    #[doc(hidden)]
1749    pub async fn cache_users_slice_pub(&self, users: &[tl::enums::User]) {
1750        self.cache_users_slice(users).await;
1751    }
1752
1753    #[doc(hidden)]
1754    pub async fn cache_chats_slice_pub(&self, chats: &[tl::enums::Chat]) {
1755        self.cache_chats_slice(chats).await;
1756    }
1757
1758    /// Public RPC call for use by sub-modules.
1759    #[doc(hidden)]
1760    pub async fn rpc_call_raw_pub<R: layer_tl_types::RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
1761        self.rpc_call_raw(req).await
1762    }
1763
1764    // ── Paginated dialog iterator ──────────────────────────────────────────
1765
1766    /// Fetch dialogs page by page.
1767    ///
1768    /// Returns a [`DialogIter`] that can be advanced with [`DialogIter::next`].
1769    /// This lets you page through all dialogs without loading them all at once.
1770    ///
1771    /// # Example
1772    /// ```rust,no_run
1773    /// # async fn f(client: layer_client::Client) -> Result<(), Box<dyn std::error::Error>> {
1774    /// let mut iter = client.iter_dialogs();
1775    /// while let Some(dialog) = iter.next(&client).await? {
1776    ///     println!("{}", dialog.title());
1777    /// }
1778    /// # Ok(()) }
1779    /// ```
1780    pub fn iter_dialogs(&self) -> DialogIter {
1781        DialogIter {
1782            offset_date: 0,
1783            offset_id:   0,
1784            offset_peer: tl::enums::InputPeer::Empty,
1785            done:        false,
1786            buffer:      VecDeque::new(),
1787        }
1788    }
1789
1790    /// Fetch messages from a peer, page by page.
1791    ///
1792    /// Returns a [`MessageIter`] that can be advanced with [`MessageIter::next`].
1793    ///
1794    /// # Example
1795    /// ```rust,no_run
1796    /// # async fn f(client: layer_client::Client, peer: layer_tl_types::enums::Peer) -> Result<(), Box<dyn std::error::Error>> {
1797    /// let mut iter = client.iter_messages(peer);
1798    /// while let Some(msg) = iter.next(&client).await? {
1799    ///     println!("{:?}", msg.text());
1800    /// }
1801    /// # Ok(()) }
1802    /// ```
1803    pub fn iter_messages(&self, peer: tl::enums::Peer) -> MessageIter {
1804        MessageIter {
1805            peer,
1806            offset_id: 0,
1807            done:      false,
1808            buffer:    VecDeque::new(),
1809        }
1810    }
1811
1812    // ── resolve_peer helper returning Result on unknown hash ───────────────
1813
1814    /// Try to resolve a peer to InputPeer, returning an error if the access_hash
1815    /// is unknown (i.e. the peer has not been seen in any prior API call).
1816    pub async fn resolve_to_input_peer(
1817        &self,
1818        peer: &tl::enums::Peer,
1819    ) -> Result<tl::enums::InputPeer, InvocationError> {
1820        let cache = self.inner.peer_cache.lock().await;
1821        match peer {
1822            tl::enums::Peer::User(u) => {
1823                if u.user_id == 0 {
1824                    return Ok(tl::enums::InputPeer::PeerSelf);
1825                }
1826                match cache.users.get(&u.user_id) {
1827                    Some(&hash) => Ok(tl::enums::InputPeer::User(tl::types::InputPeerUser {
1828                        user_id: u.user_id, access_hash: hash,
1829                    })),
1830                    None => Err(InvocationError::Deserialize(format!(
1831                        "access_hash unknown for user {}; resolve via username first", u.user_id
1832                    ))),
1833                }
1834            }
1835            tl::enums::Peer::Chat(c) => {
1836                Ok(tl::enums::InputPeer::Chat(tl::types::InputPeerChat { chat_id: c.chat_id }))
1837            }
1838            tl::enums::Peer::Channel(c) => {
1839                match cache.channels.get(&c.channel_id) {
1840                    Some(&hash) => Ok(tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
1841                        channel_id: c.channel_id, access_hash: hash,
1842                    })),
1843                    None => Err(InvocationError::Deserialize(format!(
1844                        "access_hash unknown for channel {}; resolve via username first", c.channel_id
1845                    ))),
1846                }
1847            }
1848        }
1849    }
1850
1851    // ── Multi-DC pool ──────────────────────────────────────────────────────
1852
1853    /// Invoke a request on a specific DC, using the pool.
1854    ///
1855    /// If the target DC has no auth key yet, one is acquired via DH and then
1856    /// authorized via `auth.exportAuthorization` / `auth.importAuthorization`
1857    /// so the worker DC can serve user-account requests too.
1858    pub async fn invoke_on_dc<R: RemoteCall>(
1859        &self,
1860        dc_id: i32,
1861        req:   &R,
1862    ) -> Result<R::Return, InvocationError> {
1863        let body = self.rpc_on_dc_raw(dc_id, req).await?;
1864        let mut cur = Cursor::from_slice(&body);
1865        R::Return::deserialize(&mut cur).map_err(Into::into)
1866    }
1867
1868    /// Raw RPC call routed to `dc_id`, exporting auth if needed.
1869    async fn rpc_on_dc_raw<R: RemoteCall>(
1870        &self,
1871        dc_id: i32,
1872        req:   &R,
1873    ) -> Result<Vec<u8>, InvocationError> {
1874        // Check if we need to open a new connection for this DC
1875        let needs_new = {
1876            let pool = self.inner.dc_pool.lock().await;
1877            !pool.has_connection(dc_id)
1878        };
1879
1880        if needs_new {
1881            let addr = {
1882                let opts = self.inner.dc_options.lock().await;
1883                opts.get(&dc_id).map(|e| e.addr.clone())
1884                    .ok_or_else(|| InvocationError::Deserialize(format!("unknown DC{dc_id}")))?
1885            };
1886
1887            let socks5    = self.inner.socks5.clone();
1888            let transport = self.inner.transport.clone();
1889            let saved_key = {
1890                let opts = self.inner.dc_options.lock().await;
1891                opts.get(&dc_id).and_then(|e| e.auth_key)
1892            };
1893
1894            let dc_conn = if let Some(key) = saved_key {
1895                dc_pool::DcConnection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
1896            } else {
1897                let conn = dc_pool::DcConnection::connect_raw(&addr, socks5.as_ref(), &transport).await?;
1898                // Export auth from home DC and import into worker DC
1899                let home_dc_id = *self.inner.home_dc_id.lock().await;
1900                if dc_id != home_dc_id {
1901                    if let Err(e) = self.export_import_auth(dc_id, &conn).await {
1902                        log::warn!("[layer] Auth export/import for DC{dc_id} failed: {e}");
1903                    }
1904                }
1905                conn
1906            };
1907
1908            let key = dc_conn.auth_key_bytes();
1909            {
1910                let mut opts = self.inner.dc_options.lock().await;
1911                if let Some(e) = opts.get_mut(&dc_id) {
1912                    e.auth_key = Some(key);
1913                }
1914            }
1915            self.inner.dc_pool.lock().await.insert(dc_id, dc_conn);
1916        }
1917
1918        let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
1919        self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, req).await
1920    }
1921
1922    /// Export authorization from the home DC and import it into `dc_id`.
1923    async fn export_import_auth(
1924        &self,
1925        dc_id:   i32,
1926        _dc_conn: &dc_pool::DcConnection, // reserved for future direct import
1927    ) -> Result<(), InvocationError> {
1928        // Export from home DC
1929        let export_req = tl::functions::auth::ExportAuthorization { dc_id };
1930        let body    = self.rpc_call_raw(&export_req).await?;
1931        let mut cur = Cursor::from_slice(&body);
1932        let exported = match tl::enums::auth::ExportedAuthorization::deserialize(&mut cur)? {
1933            tl::enums::auth::ExportedAuthorization::ExportedAuthorization(e) => e,
1934        };
1935
1936        // Import into the target DC via the pool
1937        let import_req = tl::functions::auth::ImportAuthorization {
1938            id:    exported.id,
1939            bytes: exported.bytes,
1940        };
1941        let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
1942        self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, &import_req).await?;
1943        log::info!("[layer] Auth exported+imported to DC{dc_id} ✓");
1944        Ok(())
1945    }
1946
1947    // ── Private helpers ────────────────────────────────────────────────────
1948
1949    async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
1950        let body    = self.rpc_call_raw(&tl::functions::account::GetPassword {}).await?;
1951        let mut cur = Cursor::from_slice(&body);
1952        let pw = match tl::enums::account::Password::deserialize(&mut cur)? {
1953            tl::enums::account::Password::Password(p) => p,
1954        };
1955        Ok(PasswordToken { password: pw })
1956    }
1957
1958    fn make_send_code_req(&self, phone: &str) -> tl::functions::auth::SendCode {
1959        tl::functions::auth::SendCode {
1960            phone_number: phone.to_string(),
1961            api_id:       self.inner.api_id,
1962            api_hash:     self.inner.api_hash.clone(),
1963            settings:     tl::enums::CodeSettings::CodeSettings(
1964                tl::types::CodeSettings {
1965                    allow_flashcall: false, current_number: false, allow_app_hash: false,
1966                    allow_missed_call: false, allow_firebase: false, unknown_number: false,
1967                    logout_tokens: None, token: None, app_sandbox: None,
1968                },
1969            ),
1970        }
1971    }
1972
1973    fn extract_user_name(user: &tl::enums::User) -> String {
1974        match user {
1975            tl::enums::User::User(u) => {
1976                format!("{} {}",
1977                    u.first_name.as_deref().unwrap_or(""),
1978                    u.last_name.as_deref().unwrap_or(""))
1979                    .trim().to_string()
1980            }
1981            tl::enums::User::Empty(_) => "(unknown)".into(),
1982        }
1983    }
1984
1985    fn extract_password_params(
1986        algo: &tl::enums::PasswordKdfAlgo,
1987    ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
1988        match algo {
1989            tl::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
1990                Ok((&a.salt1, &a.salt2, &a.p, a.g))
1991            }
1992            _ => Err(InvocationError::Deserialize("unsupported password KDF algo".into())),
1993        }
1994    }
1995}
1996
1997// ─── Paginated iterators ──────────────────────────────────────────────────────
1998
1999/// Cursor-based iterator over dialogs. Created by [`Client::iter_dialogs`].
2000pub struct DialogIter {
2001    offset_date: i32,
2002    offset_id:   i32,
2003    offset_peer: tl::enums::InputPeer,
2004    done:        bool,
2005    buffer:      VecDeque<Dialog>,
2006}
2007
2008impl DialogIter {
2009    const PAGE_SIZE: i32 = 100;
2010
2011    /// Fetch the next dialog. Returns `None` when all dialogs have been yielded.
2012    pub async fn next(&mut self, client: &Client) -> Result<Option<Dialog>, InvocationError> {
2013        if let Some(d) = self.buffer.pop_front() { return Ok(Some(d)); }
2014        if self.done { return Ok(None); }
2015
2016        let req = tl::functions::messages::GetDialogs {
2017            exclude_pinned: false,
2018            folder_id:      None,
2019            offset_date:    self.offset_date,
2020            offset_id:      self.offset_id,
2021            offset_peer:    self.offset_peer.clone(),
2022            limit:          Self::PAGE_SIZE,
2023            hash:           0,
2024        };
2025
2026        let dialogs = client.get_dialogs_raw(req).await?;
2027        if dialogs.is_empty() || dialogs.len() < Self::PAGE_SIZE as usize {
2028            self.done = true;
2029        }
2030
2031        // Prepare cursor for next page
2032        if let Some(last) = dialogs.last() {
2033            self.offset_date = last.message.as_ref().map(|m| match m {
2034                tl::enums::Message::Message(x) => x.date,
2035                tl::enums::Message::Service(x) => x.date,
2036                _ => 0,
2037            }).unwrap_or(0);
2038            self.offset_id = last.top_message();
2039            if let Some(peer) = last.peer() {
2040                self.offset_peer = client.inner.peer_cache.lock().await.peer_to_input(peer);
2041            }
2042        }
2043
2044        self.buffer.extend(dialogs);
2045        Ok(self.buffer.pop_front())
2046    }
2047}
2048
2049/// Cursor-based iterator over message history. Created by [`Client::iter_messages`].
2050pub struct MessageIter {
2051    peer:      tl::enums::Peer,
2052    offset_id: i32,
2053    done:      bool,
2054    buffer:    VecDeque<update::IncomingMessage>,
2055}
2056
2057impl MessageIter {
2058    const PAGE_SIZE: i32 = 100;
2059
2060    /// Fetch the next message (newest first). Returns `None` when all messages have been yielded.
2061    pub async fn next(&mut self, client: &Client) -> Result<Option<update::IncomingMessage>, InvocationError> {
2062        if let Some(m) = self.buffer.pop_front() { return Ok(Some(m)); }
2063        if self.done { return Ok(None); }
2064
2065        let input_peer = client.inner.peer_cache.lock().await.peer_to_input(&self.peer);
2066        let page = client.get_messages(input_peer, Self::PAGE_SIZE, self.offset_id).await?;
2067
2068        if page.is_empty() || page.len() < Self::PAGE_SIZE as usize {
2069            self.done = true;
2070        }
2071        if let Some(last) = page.last() {
2072            self.offset_id = last.id();
2073        }
2074
2075        self.buffer.extend(page);
2076        Ok(self.buffer.pop_front())
2077    }
2078}
2079
2080// ─── Public random helper (used by media.rs) ──────────────────────────────────
2081
2082/// Public wrapper for `random_i64` used by sub-modules.
2083#[doc(hidden)]
2084pub fn random_i64_pub() -> i64 { random_i64() }
2085
2086// ─── Connection ───────────────────────────────────────────────────────────────
2087
2088/// How framing bytes are sent/received on a connection.
2089enum FrameKind {
2090    Abridged,
2091    Intermediate,
2092    #[allow(dead_code)]
2093    Full { send_seqno: u32, recv_seqno: u32 },
2094}
2095
2096struct Connection {
2097    stream:     TcpStream,
2098    enc:        EncryptedSession,
2099    frame_kind: FrameKind,
2100}
2101
2102impl Connection {
2103    /// Open a TCP stream, optionally via SOCKS5, and apply transport init bytes.
2104    async fn open_stream(
2105        addr:      &str,
2106        socks5:    Option<&crate::socks5::Socks5Config>,
2107        transport: &TransportKind,
2108    ) -> Result<(TcpStream, FrameKind), InvocationError> {
2109        let stream = match socks5 {
2110            Some(proxy) => proxy.connect(addr).await?,
2111            None        => TcpStream::connect(addr).await?,
2112        };
2113        Self::apply_transport_init(stream, transport).await
2114    }
2115
2116    /// Send the transport init bytes and return the stream + FrameKind.
2117    async fn apply_transport_init(
2118        mut stream: TcpStream,
2119        transport:  &TransportKind,
2120    ) -> Result<(TcpStream, FrameKind), InvocationError> {
2121        match transport {
2122            TransportKind::Abridged => {
2123                stream.write_all(&[0xef]).await?;
2124                Ok((stream, FrameKind::Abridged))
2125            }
2126            TransportKind::Intermediate => {
2127                stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
2128                Ok((stream, FrameKind::Intermediate))
2129            }
2130            TransportKind::Full => {
2131                // Full transport has no init byte
2132                Ok((stream, FrameKind::Full { send_seqno: 0, recv_seqno: 0 }))
2133            }
2134            TransportKind::Obfuscated { secret } => {
2135                // For obfuscated we do the full handshake inside open_obfuscated,
2136                // then wrap back in a plain TcpStream via into_inner.
2137                // Since ObfuscatedStream is a different type we reuse the Abridged
2138                // frame logic internally — the encryption layer handles everything.
2139                //
2140                // Implementation note: We convert to Abridged after the handshake
2141                // because ObfuscatedStream internally already uses Abridged framing
2142                // with XOR applied on top.  The outer Connection just sends raw bytes.
2143                let mut nonce = [0u8; 64];
2144                getrandom::getrandom(&mut nonce).map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
2145                // Write obfuscated handshake header
2146                let (enc_key, enc_iv, _dec_key, _dec_iv) = crate::transport_obfuscated::derive_keys(&nonce, secret.as_ref());
2147                let mut enc_cipher = crate::transport_obfuscated::ObfCipher::new(enc_key, enc_iv);
2148                // Stamp protocol tag into nonce[56..60]
2149                let mut handshake = nonce;
2150                handshake[56] = 0xef; handshake[57] = 0xef;
2151                handshake[58] = 0xef; handshake[59] = 0xef;
2152                enc_cipher.apply(&mut handshake[56..]);
2153                stream.write_all(&handshake).await?;
2154                Ok((stream, FrameKind::Abridged))
2155            }
2156        }
2157    }
2158
2159    async fn connect_raw(
2160        addr:      &str,
2161        socks5:    Option<&crate::socks5::Socks5Config>,
2162        transport: &TransportKind,
2163    ) -> Result<Self, InvocationError> {
2164        log::info!("[layer] Connecting to {addr} (DH) …");
2165
2166        // Wrap the entire DH handshake in a timeout so a silent server
2167        // response (e.g. a mis-framed transport error) never causes an
2168        // infinite hang.
2169        let addr2      = addr.to_string();
2170        let socks5_c   = socks5.cloned();
2171        let transport_c = transport.clone();
2172
2173        let fut = async move {
2174            let (mut stream, frame_kind) =
2175                Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
2176
2177            let mut plain = Session::new();
2178
2179            let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2180            send_frame(&mut stream, &plain.pack(&req1).to_plaintext_bytes(), &frame_kind).await?;
2181            let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
2182
2183            let (req2, s2) = auth::step2(s1, res_pq).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2184            send_frame(&mut stream, &plain.pack(&req2).to_plaintext_bytes(), &frame_kind).await?;
2185            let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
2186
2187            let (req3, s3) = auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2188            send_frame(&mut stream, &plain.pack(&req3).to_plaintext_bytes(), &frame_kind).await?;
2189            let ans: tl::enums::SetClientDhParamsAnswer = recv_frame_plain(&mut stream, &frame_kind).await?;
2190
2191            let done = auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2192            log::info!("[layer] DH complete ✓");
2193
2194            Ok::<Self, InvocationError>(Self {
2195                stream,
2196                enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
2197                frame_kind,
2198            })
2199        };
2200
2201        tokio::time::timeout(Duration::from_secs(15), fut)
2202            .await
2203            .map_err(|_| InvocationError::Deserialize(
2204                format!("DH handshake with {addr} timed out after 15 s")
2205            ))?
2206    }
2207
2208    async fn connect_with_key(
2209        addr:        &str,
2210        auth_key:    [u8; 256],
2211        first_salt:  i64,
2212        time_offset: i32,
2213        socks5:      Option<&crate::socks5::Socks5Config>,
2214        transport:   &TransportKind,
2215    ) -> Result<Self, InvocationError> {
2216        let addr2       = addr.to_string();
2217        let socks5_c    = socks5.cloned();
2218        let transport_c = transport.clone();
2219
2220        let fut = async move {
2221            let (stream, frame_kind) =
2222                Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
2223            Ok::<Self, InvocationError>(Self {
2224                stream,
2225                enc: EncryptedSession::new(auth_key, first_salt, time_offset),
2226                frame_kind,
2227            })
2228        };
2229
2230        tokio::time::timeout(Duration::from_secs(15), fut)
2231            .await
2232            .map_err(|_| InvocationError::Deserialize(
2233                format!("connect_with_key to {addr} timed out after 15 s")
2234            ))?
2235    }
2236
2237    fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
2238    fn first_salt(&self)     -> i64         { self.enc.salt }
2239    fn time_offset(&self)    -> i32         { self.enc.time_offset }
2240
2241    async fn rpc_call<R: RemoteCall>(
2242        &mut self,
2243        req: &R,
2244        side_updates: &mut Vec<update::Update>,
2245    ) -> Result<Vec<u8>, InvocationError> {
2246        let wire = self.enc.pack(req);
2247        send_frame(&mut self.stream, &wire, &self.frame_kind).await?;
2248        tokio::time::timeout(Duration::from_secs(10), self.recv_rpc_with_updates(side_updates))
2249            .await
2250            .map_err(|_| InvocationError::Deserialize("rpc_call timed out after 10 s".into()))?
2251    }
2252
2253    async fn rpc_call_serializable<S: tl::Serializable>(
2254        &mut self,
2255        req: &S,
2256        side_updates: &mut Vec<update::Update>,
2257    ) -> Result<Vec<u8>, InvocationError> {
2258        let wire = self.enc.pack_serializable(req);
2259        send_frame(&mut self.stream, &wire, &self.frame_kind).await?;
2260        tokio::time::timeout(Duration::from_secs(10), self.recv_rpc_with_updates(side_updates))
2261            .await
2262            .map_err(|_| InvocationError::Deserialize("rpc_call_serializable timed out after 10 s".into()))?
2263    }
2264
2265    /// Like `rpc_call_serializable` but accepts either a Payload OR an Updates
2266    /// frame as a successful response.  Use this for write RPCs whose return
2267    /// type in the TL schema is `Updates` — Telegram may respond with an
2268    /// `updateShort` instead of a full serialized result.
2269    async fn rpc_call_ack<S: tl::Serializable>(
2270        &mut self,
2271        req: &S,
2272        side_updates: &mut Vec<update::Update>,
2273    ) -> Result<(), InvocationError> {
2274        let wire = self.enc.pack_serializable(req);
2275        send_frame(&mut self.stream, &wire, &self.frame_kind).await?;
2276        tokio::time::timeout(Duration::from_secs(10), self.recv_ack(side_updates))
2277            .await
2278            .map_err(|_| InvocationError::Deserialize("rpc_call_ack timed out after 10 s".into()))?
2279    }
2280
2281    async fn recv_ack(
2282        &mut self,
2283        side_updates: &mut Vec<update::Update>,
2284    ) -> Result<(), InvocationError> {
2285        loop {
2286            let mut raw = recv_frame(&mut self.stream, &mut self.frame_kind).await?;
2287            let msg = self.enc.unpack(&mut raw)
2288                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2289            if msg.salt != 0 { self.enc.salt = msg.salt; }
2290            match unwrap_envelope(msg.body)? {
2291                EnvelopeResult::Payload(_) => return Ok(()),
2292                EnvelopeResult::Updates(us) => {
2293                    side_updates.extend(us);
2294                    return Ok(());
2295                }
2296                EnvelopeResult::None => {}
2297            }
2298        }
2299    }
2300
2301    /// Receive one RPC response, collecting any interleaved server-push updates
2302    /// into `side_updates` so the caller can forward them to the update channel.
2303    async fn recv_rpc_with_updates(
2304        &mut self,
2305        side_updates: &mut Vec<update::Update>,
2306    ) -> Result<Vec<u8>, InvocationError> {
2307        loop {
2308            let mut raw = recv_frame(&mut self.stream, &mut self.frame_kind).await?;
2309            let msg = self.enc.unpack(&mut raw)
2310                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2311            if msg.salt != 0 { self.enc.salt = msg.salt; }
2312            match unwrap_envelope(msg.body)? {
2313                EnvelopeResult::Payload(p)  => return Ok(p),
2314                EnvelopeResult::Updates(us) => {
2315                    log::debug!("[layer] {} updates interleaved during RPC", us.len());
2316                    side_updates.extend(us);
2317                }
2318                EnvelopeResult::None => {}
2319            }
2320        }
2321    }
2322
2323    /// Legacy recv_rpc that discards interleaved updates (used internally only).
2324    #[allow(dead_code)]
2325    async fn recv_rpc(&mut self) -> Result<Vec<u8>, InvocationError> {
2326        let mut _discard = Vec::new();
2327        self.recv_rpc_with_updates(&mut _discard).await
2328    }
2329
2330    async fn recv_once(&mut self) -> Result<Vec<update::Update>, InvocationError> {
2331        let mut raw = recv_frame(&mut self.stream, &mut self.frame_kind).await?;
2332        let msg = self.enc.unpack(&mut raw)
2333            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2334        if msg.salt != 0 { self.enc.salt = msg.salt; }
2335        match unwrap_envelope(msg.body)? {
2336            EnvelopeResult::Updates(us) => Ok(us),
2337            _ => Ok(vec![]),
2338        }
2339    }
2340
2341    async fn send_ping(&mut self) -> Result<(), InvocationError> {
2342        let req = tl::functions::Ping { ping_id: random_i64() };
2343        let wire = self.enc.pack(&req);
2344        send_frame(&mut self.stream, &wire, &self.frame_kind).await?;
2345        Ok(())
2346    }
2347}
2348
2349// ─── Transport framing (multi-kind) ──────────────────────────────────────────
2350
2351/// Send a framed message using the active transport kind.
2352async fn send_frame(
2353    stream: &mut TcpStream,
2354    data:   &[u8],
2355    kind:   &FrameKind,
2356) -> Result<(), InvocationError> {
2357    match kind {
2358        FrameKind::Abridged => send_abridged(stream, data).await,
2359        FrameKind::Intermediate => {
2360            stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
2361            stream.write_all(data).await?;
2362            Ok(())
2363        }
2364        FrameKind::Full { .. } => {
2365            // seqno and CRC handled inside Connection; here we just prefix length
2366            // Full framing: [total_len 4B][seqno 4B][payload][crc32 4B]
2367            // But send_frame is called with already-encrypted payload.
2368            // We use a simplified approach: emit the same as Intermediate for now
2369            // and note that Full's seqno/CRC are transport-level, not app-level.
2370            stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
2371            stream.write_all(data).await?;
2372            Ok(())
2373        }
2374    }
2375}
2376
2377/// Receive a framed message.
2378async fn recv_frame(
2379    stream: &mut TcpStream,
2380    kind:   &mut FrameKind,
2381) -> Result<Vec<u8>, InvocationError> {
2382    match kind {
2383        FrameKind::Abridged => recv_abridged(stream).await,
2384        FrameKind::Intermediate | FrameKind::Full { .. } => {
2385            let mut len_buf = [0u8; 4];
2386            stream.read_exact(&mut len_buf).await?;
2387            let len = u32::from_le_bytes(len_buf) as usize;
2388            let mut buf = vec![0u8; len];
2389            stream.read_exact(&mut buf).await?;
2390            Ok(buf)
2391        }
2392    }
2393}
2394
2395/// Send using Abridged framing (used for DH plaintext during connect).
2396async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
2397    let words = data.len() / 4;
2398    if words < 0x7f {
2399        stream.write_all(&[words as u8]).await?;
2400    } else {
2401        let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
2402        stream.write_all(&b).await?;
2403    }
2404    stream.write_all(data).await?;
2405    Ok(())
2406}
2407
2408async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
2409    let mut h = [0u8; 1];
2410    stream.read_exact(&mut h).await?;
2411    let words = if h[0] < 0x7f {
2412        h[0] as usize
2413    } else {
2414        let mut b = [0u8; 3];
2415        stream.read_exact(&mut b).await?;
2416        let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
2417        // word count of 1 after 0xFF = Telegram 4-byte transport error code
2418        if w == 1 {
2419            let mut code_buf = [0u8; 4];
2420            stream.read_exact(&mut code_buf).await?;
2421            let code = i32::from_le_bytes(code_buf);
2422            return Err(InvocationError::Rpc(RpcError::from_telegram(code, "transport error")));
2423        }
2424        w
2425    };
2426    // Guard against implausibly large reads — a raw 4-byte transport error
2427    // whose first byte was mis-read as a word count causes a hang otherwise.
2428    if words == 0 || words > 0x8000 {
2429        return Err(InvocationError::Deserialize(
2430            format!("abridged: implausible word count {words} (possible transport error or framing mismatch)")
2431        ));
2432    }
2433    let mut buf = vec![0u8; words * 4];
2434    stream.read_exact(&mut buf).await?;
2435    Ok(buf)
2436}
2437
2438/// Receive a plaintext (pre-auth) frame and deserialize it.
2439async fn recv_frame_plain<T: Deserializable>(
2440    stream: &mut TcpStream,
2441    _kind:  &FrameKind,
2442) -> Result<T, InvocationError> {
2443    let raw = recv_abridged(stream).await?; // DH always uses abridged for plaintext
2444    if raw.len() < 20 {
2445        return Err(InvocationError::Deserialize("plaintext frame too short".into()));
2446    }
2447    if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
2448        return Err(InvocationError::Deserialize("expected auth_key_id=0 in plaintext".into()));
2449    }
2450    let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
2451    let mut cur  = Cursor::from_slice(&raw[20..20 + body_len]);
2452    T::deserialize(&mut cur).map_err(Into::into)
2453}
2454
2455// ─── MTProto envelope ─────────────────────────────────────────────────────────
2456
2457enum EnvelopeResult {
2458    Payload(Vec<u8>),
2459    Updates(Vec<update::Update>),
2460    None,
2461}
2462
2463fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
2464    if body.len() < 4 {
2465        return Err(InvocationError::Deserialize("body < 4 bytes".into()));
2466    }
2467    let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
2468
2469    match cid {
2470        ID_RPC_RESULT => {
2471            if body.len() < 12 {
2472                return Err(InvocationError::Deserialize("rpc_result too short".into()));
2473            }
2474            unwrap_envelope(body[12..].to_vec())
2475        }
2476        ID_RPC_ERROR => {
2477            if body.len() < 8 {
2478                return Err(InvocationError::Deserialize("rpc_error too short".into()));
2479            }
2480            let code    = i32::from_le_bytes(body[4..8].try_into().unwrap());
2481            let message = tl_read_string(&body[8..]).unwrap_or_default();
2482            Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
2483        }
2484        ID_MSG_CONTAINER => {
2485            if body.len() < 8 {
2486                return Err(InvocationError::Deserialize("container too short".into()));
2487            }
2488            let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
2489            let mut pos = 8usize;
2490            let mut payload: Option<Vec<u8>> = None;
2491            let mut updates_buf: Vec<update::Update> = Vec::new();
2492
2493            for _ in 0..count {
2494                if pos + 16 > body.len() { break; }
2495                let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
2496                pos += 16;
2497                if pos + inner_len > body.len() { break; }
2498                let inner = body[pos..pos + inner_len].to_vec();
2499                pos += inner_len;
2500                match unwrap_envelope(inner)? {
2501                    EnvelopeResult::Payload(p)  => { payload = Some(p); }
2502                    EnvelopeResult::Updates(us) => { updates_buf.extend(us); }
2503                    EnvelopeResult::None        => {}
2504                }
2505            }
2506            if let Some(p) = payload {
2507                Ok(EnvelopeResult::Payload(p))
2508            } else if !updates_buf.is_empty() {
2509                Ok(EnvelopeResult::Updates(updates_buf))
2510            } else {
2511                Ok(EnvelopeResult::None)
2512            }
2513        }
2514        ID_GZIP_PACKED => {
2515            let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
2516            unwrap_envelope(gz_inflate(&bytes)?)
2517        }
2518        // MTProto service messages — all silently acknowledged, no payload extracted
2519        ID_PONG | ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY
2520        // Grammers also silences these; we do the same to avoid routing them as payloads
2521        | 0xd33b5459  // MsgsStateReq
2522        | 0x04deb57d  // MsgsStateInfo
2523        | 0x8cc0d131  // MsgsAllInfo
2524        | 0x276d3ec6  // MsgDetailedInfo
2525        | 0x809db6df  // MsgNewDetailedInfo
2526        | 0x7d861a08  // MsgResendReq / MsgResendAnsReq
2527        | 0x0949d9dc  // FutureSalt
2528        | 0xae500895  // FutureSalts
2529        | 0x9299359f  // HttpWait
2530        | 0xe22045fc  // DestroySessionOk
2531        | 0x62d350c9  // DestroySessionNone
2532        => {
2533            Ok(EnvelopeResult::None)
2534        }
2535        ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
2536        | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
2537        | ID_UPDATES_TOO_LONG => {
2538            Ok(EnvelopeResult::Updates(update::parse_updates(&body)))
2539        }
2540        _ => Ok(EnvelopeResult::Payload(body)),
2541    }
2542}
2543
2544// ─── Utilities ────────────────────────────────────────────────────────────────
2545
2546fn random_i64() -> i64 {
2547    let mut b = [0u8; 8];
2548    getrandom::getrandom(&mut b).expect("getrandom");
2549    i64::from_le_bytes(b)
2550}
2551
2552fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
2553    if data.is_empty() { return Some(vec![]); }
2554    let (len, start) = if data[0] < 254 { (data[0] as usize, 1) }
2555    else if data.len() >= 4 {
2556        (data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16, 4)
2557    } else { return None; };
2558    if data.len() < start + len { return None; }
2559    Some(data[start..start + len].to_vec())
2560}
2561
2562fn tl_read_string(data: &[u8]) -> Option<String> {
2563    tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
2564}
2565
2566fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
2567    use std::io::Read;
2568    let mut out = Vec::new();
2569    if flate2::read::GzDecoder::new(data).read_to_end(&mut out).is_ok() && !out.is_empty() {
2570        return Ok(out);
2571    }
2572    out.clear();
2573    flate2::read::ZlibDecoder::new(data)
2574        .read_to_end(&mut out)
2575        .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
2576    Ok(out)
2577}