Skip to main content

layer_client/
lib.rs

1//! # layer-client — Production-grade async Telegram client
2//!
3//! A fully async, production-ready Telegram client built on top of the MTProto
4//! protocol. Inspired by and architecturally aligned with [`grammers`](https://codeberg.org/Lonami/grammers).
5//!
6//! ## Features
7//!
8//! - ✅ Full async/tokio I/O
9//! - ✅ User login (phone + code + 2FA SRP)
10//! - ✅ Bot token login (`bot_sign_in`)
11//! - ✅ `FLOOD_WAIT` auto-retry with configurable policy
12//! - ✅ Update stream (`stream_updates`) with typed events
13//! - ✅ Raw update access
14//! - ✅ `NewMessage`, `MessageEdited`, `MessageDeleted`, `CallbackQuery`, `InlineQuery`
15//! - ✅ Callback query answering
16//! - ✅ Inline query answering
17//! - ✅ Dialog iteration (`iter_dialogs`)
18//! - ✅ Message iteration (`iter_messages`)
19//! - ✅ Peer resolution (username, phone, ID)
20//! - ✅ Send / edit / delete messages
21//! - ✅ Forward messages
22//! - ✅ DC migration handling
23//! - ✅ Session persistence
24//! - ✅ Sign out
25//!
26//! ## Quick Start — User Account
27//!
28//! ```rust,no_run
29//! use layer_client::{Client, Config, SignInError};
30//!
31//! #[tokio::main]
32//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
33//!     let mut client = Client::connect(Config {
34//!         session_path: "my.session".into(),
35//!         api_id:       12345,
36//!         api_hash:     "abc123".into(),
37//!         ..Default::default()
38//!     }).await?;
39//!
40//!     if !client.is_authorized().await? {
41//!         let token = client.request_login_code("+1234567890").await?;
42//!         let code  = "12345"; // read from stdin
43//!
44//!         match client.sign_in(&token, code).await {
45//!             Ok(_)                                 => {}
46//!             Err(SignInError::PasswordRequired(t)) => {
47//!                 client.check_password(t, "my_password").await?;
48//!             }
49//!             Err(e) => return Err(e.into()),
50//!         }
51//!         client.save_session().await?;
52//!     }
53//!
54//!     client.send_message("me", "Hello from layer!").await?;
55//!     Ok(())
56//! }
57//! ```
58//!
59//! ## Quick Start — Bot
60//!
61//! ```rust,no_run
62//! use layer_client::{Client, Config, update::Update};
63//!
64//! #[tokio::main]
65//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
66//!     let mut client = Client::connect(Config {
67//!         session_path: "bot.session".into(),
68//!         api_id:       12345,
69//!         api_hash:     "abc123".into(),
70//!         ..Default::default()
71//!     }).await?;
72//!
73//!     client.bot_sign_in("1234567890:ABCdef...").await?;
74//!     client.save_session().await?;
75//!
76//!     let mut updates = client.stream_updates();
77//!     while let Ok(update) = updates.next().await {
78//!         if let Update::NewMessage(msg) = update {
79//!             if !msg.outgoing() {
80//!                 client.send_message_to_peer(msg.chat().unwrap().clone(), msg.text()).await?;
81//!             }
82//!         }
83//!     }
84//!     Ok(())
85//! }
86//! ```
87
88#![deny(unsafe_code)]
89
90mod errors;
91mod retry;
92mod session;
93mod transport;
94mod two_factor_auth;
95pub mod update;
96
97pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
98pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
99pub use update::Update;
100
101use std::collections::HashMap;
102use std::num::NonZeroU32;
103use std::ops::ControlFlow;
104use std::path::PathBuf;
105use std::sync::Arc;
106use std::time::Duration;
107
108use layer_mtproto::{EncryptedSession, Session, authentication as auth};
109use layer_tl_types::{Cursor, Deserializable, RemoteCall};
110use session::{DcEntry, PersistedSession};
111use tokio::io::{AsyncReadExt, AsyncWriteExt};
112use tokio::net::TcpStream;
113use tokio::sync::{mpsc, Mutex};
114use tokio::time::sleep;
115
116// ─── MTProto envelope constructor IDs ────────────────────────────────────────
117
118const ID_RPC_RESULT:      u32 = 0xf35c6d01;
119const ID_RPC_ERROR:       u32 = 0x2144ca19;
120const ID_MSG_CONTAINER:   u32 = 0x73f1f8dc;
121const ID_GZIP_PACKED:     u32 = 0x3072cfa1;
122const ID_PONG:            u32 = 0x347773c5;
123const ID_MSGS_ACK:        u32 = 0x62d6b459;
124const ID_BAD_SERVER_SALT: u32 = 0xedab447b;
125const ID_NEW_SESSION:     u32 = 0x9ec20908;
126const ID_BAD_MSG_NOTIFY:  u32 = 0xa7eff811;
127const ID_UPDATES:         u32 = 0x74ae4240;
128const ID_UPDATE_SHORT:    u32 = 0x2114be86;
129const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
130const ID_UPDATE_SHORT_MSG: u32 = 0x313bc7f8;
131
132// ─── Config ───────────────────────────────────────────────────────────────────
133
134/// Configuration for [`Client::connect`].
135#[derive(Clone)]
136pub struct Config {
137    /// Where to load/save the session file.
138    pub session_path: PathBuf,
139    /// Telegram API ID from <https://my.telegram.org>.
140    pub api_id:       i32,
141    /// Telegram API hash from <https://my.telegram.org>.
142    pub api_hash:     String,
143    /// Initial DC address to connect to (default: DC2).
144    pub dc_addr:      Option<String>,
145    /// Retry policy for flood-wait and I/O errors.
146    pub retry_policy: Arc<dyn RetryPolicy>,
147}
148
149impl Default for Config {
150    fn default() -> Self {
151        Self {
152            session_path: "layer.session".into(),
153            api_id:       0,
154            api_hash:     String::new(),
155            dc_addr:      None,
156            retry_policy: Arc::new(AutoSleep::default()),
157        }
158    }
159}
160
161// ─── UpdatesConfiguration ─────────────────────────────────────────────────────
162
163/// Configuration for [`Client::stream_updates`].
164#[derive(Debug, Clone)]
165pub struct UpdatesConfiguration {
166    /// Optional cap on the internal update queue.
167    /// Excess updates are dropped with a warning.
168    pub update_queue_limit: Option<usize>,
169}
170
171impl Default for UpdatesConfiguration {
172    fn default() -> Self {
173        Self { update_queue_limit: Some(500) }
174    }
175}
176
177// ─── UpdateStream ─────────────────────────────────────────────────────────────
178
179/// Asynchronous stream of [`Update`]s.
180///
181/// Obtain via [`Client::stream_updates`]. Call [`UpdateStream::next`] in a loop.
182pub struct UpdateStream {
183    rx: mpsc::UnboundedReceiver<update::Update>,
184}
185
186impl UpdateStream {
187    /// Wait for the next update.
188    ///
189    /// Returns `None` when the client has disconnected.
190    pub async fn next(&mut self) -> Option<update::Update> {
191        self.rx.recv().await
192    }
193}
194
195// ─── Dialog ───────────────────────────────────────────────────────────────────
196
197/// A Telegram dialog (chat, user, channel).
198#[derive(Debug, Clone)]
199pub struct Dialog {
200    pub raw:    layer_tl_types::enums::Dialog,
201    /// The top message in the dialog, if available.
202    pub message: Option<layer_tl_types::enums::Message>,
203    /// Entity (user/chat/channel) that corresponds to this dialog's peer.
204    pub entity:  Option<layer_tl_types::enums::User>,
205}
206
207impl Dialog {
208    /// The dialog's display title (username/first name/channel name).
209    pub fn title(&self) -> String {
210        if let Some(layer_tl_types::enums::User::User(u)) = &self.entity {
211            let first = u.first_name.as_deref().unwrap_or("");
212            let last  = u.last_name.as_deref().unwrap_or("");
213            return format!("{first} {last}").trim().to_string();
214        }
215        "(Unknown)".to_string()
216    }
217}
218
219// ─── Client (Arc-wrapped) ─────────────────────────────────────────────────────
220
221struct ClientInner {
222    conn:           Mutex<Connection>,
223    home_dc_id:     Mutex<i32>,
224    dc_options:     Mutex<HashMap<i32, DcEntry>>,
225    api_id:         i32,
226    api_hash:       String,
227    session_path:   PathBuf,
228    retry_policy:   Arc<dyn RetryPolicy>,
229    _update_tx:     mpsc::UnboundedSender<update::Update>,
230}
231
232/// The main Telegram client. Cheap to clone — internally Arc-wrapped.
233#[derive(Clone)]
234pub struct Client {
235    inner: Arc<ClientInner>,
236    _update_rx: Arc<Mutex<mpsc::UnboundedReceiver<update::Update>>>,
237}
238
239impl Client {
240    // ── Connect ────────────────────────────────────────────────────────────
241
242    /// Connect to Telegram and return a ready-to-use client.
243    ///
244    /// Loads an existing session if the file exists, otherwise performs
245    /// a full DH key exchange on DC2.
246    pub async fn connect(config: Config) -> Result<Self, InvocationError> {
247        let (update_tx, update_rx) = mpsc::unbounded_channel();
248
249        // Try loading session
250        let (conn, home_dc_id, dc_opts) =
251            if config.session_path.exists() {
252                match PersistedSession::load(&config.session_path) {
253                    Ok(s) => {
254                        if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
255                            if let Some(key) = dc.auth_key {
256                                log::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
257                                match Connection::connect_with_key(&dc.addr, key, dc.first_salt, dc.time_offset).await {
258                                    Ok(c) => {
259                                        let mut opts = session::default_dc_addresses()
260                                            .into_iter()
261                                            .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
262                                            .collect::<HashMap<_, _>>();
263                                        for d in &s.dcs {
264                                            opts.insert(d.dc_id, d.clone());
265                                        }
266                                        (c, s.home_dc_id, opts)
267                                    }
268                                    Err(e) => {
269                                        log::warn!("[layer] Session connect failed ({e}), fresh connect …");
270                                        Self::fresh_connect().await?
271                                    }
272                                }
273                            } else {
274                                Self::fresh_connect().await?
275                            }
276                        } else {
277                            Self::fresh_connect().await?
278                        }
279                    }
280                    Err(e) => {
281                        log::warn!("[layer] Session load failed ({e}), fresh connect …");
282                        Self::fresh_connect().await?
283                    }
284                }
285            } else {
286                Self::fresh_connect().await?
287            };
288
289        let inner = Arc::new(ClientInner {
290            conn:        Mutex::new(conn),
291            home_dc_id:  Mutex::new(home_dc_id),
292            dc_options:  Mutex::new(dc_opts),
293            api_id:      config.api_id,
294            api_hash:    config.api_hash,
295            session_path: config.session_path,
296            retry_policy: config.retry_policy,
297            _update_tx: update_tx,
298        });
299
300        let client = Self {
301            inner,
302            _update_rx: Arc::new(Mutex::new(update_rx)),
303        };
304
305        // Run initConnection to populate DC table
306        client.init_connection().await?;
307        Ok(client)
308    }
309
310    async fn fresh_connect() -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
311        log::info!("[layer] Fresh connect to DC2 …");
312        let conn = Connection::connect_raw("149.154.167.51:443").await?;
313        let opts = session::default_dc_addresses()
314            .into_iter()
315            .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
316            .collect();
317        Ok((conn, 2, opts))
318    }
319
320    // ── Session ────────────────────────────────────────────────────────────
321
322    /// Save the current session to disk.
323    pub async fn save_session(&self) -> Result<(), InvocationError> {
324        let conn_guard = self.inner.conn.lock().await;
325        let home_dc_id = *self.inner.home_dc_id.lock().await;
326        let dc_options = self.inner.dc_options.lock().await;
327
328        let dcs = dc_options.values().map(|e| {
329            DcEntry {
330                dc_id:      e.dc_id,
331                addr:       e.addr.clone(),
332                auth_key:   if e.dc_id == home_dc_id { Some(conn_guard.auth_key_bytes()) } else { e.auth_key },
333                first_salt:  if e.dc_id == home_dc_id { conn_guard.first_salt() } else { e.first_salt },
334                time_offset: if e.dc_id == home_dc_id { conn_guard.time_offset() } else { e.time_offset },
335            }
336        }).collect();
337
338        PersistedSession { home_dc_id, dcs }
339            .save(&self.inner.session_path)
340            .map_err(|e| InvocationError::Io(e))?;
341        log::info!("[layer] Session saved ✓");
342        Ok(())
343    }
344
345    // ── Auth ───────────────────────────────────────────────────────────────
346
347    /// Returns `true` if the client is already authorized.
348    pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
349        match self.invoke(&layer_tl_types::functions::updates::GetState {}).await {
350            Ok(_)  => Ok(true),
351            Err(e) if e.is("AUTH_KEY_UNREGISTERED") || matches!(&e, InvocationError::Rpc(r) if r.code == 401) => Ok(false),
352            Err(e) => Err(e),
353        }
354    }
355
356    /// Sign in as a bot using a bot token from [@BotFather](https://t.me/BotFather).
357    ///
358    /// # Example
359    /// ```rust,no_run
360    /// client.bot_sign_in("1234567890:ABCdef...").await?;
361    /// ```
362    pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
363        let req = layer_tl_types::functions::auth::ImportBotAuthorization {
364            flags:           0,
365            api_id:          self.inner.api_id,
366            api_hash:        self.inner.api_hash.clone(),
367            bot_auth_token:  token.to_string(),
368        };
369
370        let result = match self.invoke(&req).await {
371            Ok(x) => x,
372            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
373                let dc_id = r.value.unwrap_or(2) as i32;
374                self.migrate_to(dc_id).await?;
375                self.invoke(&req).await?
376            }
377            Err(e) => return Err(e),
378        };
379
380        let name = match result {
381            layer_tl_types::enums::auth::Authorization::Authorization(a) => {
382                Self::extract_user_name(&a.user)
383            }
384            layer_tl_types::enums::auth::Authorization::SignUpRequired(_) => {
385                panic!("unexpected SignUpRequired during bot sign-in")
386            }
387        };
388        log::info!("[layer] Bot signed in ✓  ({name})");
389        Ok(name)
390    }
391
392    /// Request a login code for a user account.
393    ///
394    /// Returns a [`LoginToken`] to pass to [`sign_in`].
395    ///
396    /// [`sign_in`]: Self::sign_in
397    pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
398        use layer_tl_types::enums::auth::SentCode;
399
400        let req = self.make_send_code_req(phone);
401        let body = match self.rpc_call_raw(&req).await {
402            Ok(b)  => b,
403            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
404                let dc_id = r.value.unwrap_or(2) as i32;
405                self.migrate_to(dc_id).await?;
406                self.rpc_call_raw(&req).await?
407            }
408            Err(e) => return Err(e),
409        };
410
411        let mut cur = Cursor::from_slice(&body);
412        let hash = match layer_tl_types::enums::auth::SentCode::deserialize(&mut cur)? {
413            SentCode::SentCode(c)        => c.phone_code_hash,
414            SentCode::Success(_)         => return Err(InvocationError::Deserialize("unexpected SentCode::Success".into())),
415            SentCode::PaymentRequired(_) => return Err(InvocationError::Deserialize("payment required".into())),
416        };
417        log::info!("[layer] Login code sent");
418        Ok(LoginToken { phone: phone.to_string(), phone_code_hash: hash })
419    }
420
421    /// Complete sign-in with the code sent to the phone.
422    ///
423    /// On 2FA accounts, returns `Err(SignInError::PasswordRequired(token))`.
424    /// Pass the token to [`check_password`].
425    ///
426    /// [`check_password`]: Self::check_password
427    pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
428        let req = layer_tl_types::functions::auth::SignIn {
429            phone_number:       token.phone.clone(),
430            phone_code_hash:    token.phone_code_hash.clone(),
431            phone_code:         Some(code.trim().to_string()),
432            email_verification: None,
433        };
434
435        let body = match self.rpc_call_raw(&req).await {
436            Ok(b) => b,
437            Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
438                let dc_id = r.value.unwrap_or(2) as i32;
439                self.migrate_to(dc_id).await.map_err(SignInError::Other)?;
440                self.rpc_call_raw(&req).await.map_err(SignInError::Other)?
441            }
442            Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
443                let token = self.get_password_info().await.map_err(SignInError::Other)?;
444                return Err(SignInError::PasswordRequired(token));
445            }
446            Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
447            Err(e) => return Err(SignInError::Other(e)),
448        };
449
450        let mut cur = Cursor::from_slice(&body);
451        match layer_tl_types::enums::auth::Authorization::deserialize(&mut cur)
452            .map_err(|e| SignInError::Other(e.into()))?
453        {
454            layer_tl_types::enums::auth::Authorization::Authorization(a) => {
455                let name = Self::extract_user_name(&a.user);
456                log::info!("[layer] Signed in ✓  Welcome, {name}!");
457                Ok(name)
458            }
459            layer_tl_types::enums::auth::Authorization::SignUpRequired(_) => {
460                Err(SignInError::SignUpRequired)
461            }
462        }
463    }
464
465    /// Complete 2FA login.
466    ///
467    /// `token` comes from `Err(SignInError::PasswordRequired(token))`.
468    pub async fn check_password(
469        &self,
470        token: PasswordToken,
471        password: impl AsRef<[u8]>,
472    ) -> Result<String, InvocationError> {
473        let pw   = token.password;
474        let algo = pw.current_algo.ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
475
476        let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
477        let g_b  = pw.srp_b.ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
478        let a    = pw.secure_random;
479        let srp_id = pw.srp_id.ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
480
481        let (m1, g_a) = two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
482
483        let req = layer_tl_types::functions::auth::CheckPassword {
484            password: layer_tl_types::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
485                layer_tl_types::types::InputCheckPasswordSrp {
486                    srp_id,
487                    a: g_a.to_vec(),
488                    m1: m1.to_vec(),
489                },
490            ),
491        };
492
493        let body = self.rpc_call_raw(&req).await?;
494        let mut cur = Cursor::from_slice(&body);
495        match layer_tl_types::enums::auth::Authorization::deserialize(&mut cur)? {
496            layer_tl_types::enums::auth::Authorization::Authorization(a) => {
497                let name = Self::extract_user_name(&a.user);
498                log::info!("[layer] 2FA ✓  Welcome, {name}!");
499                Ok(name)
500            }
501            layer_tl_types::enums::auth::Authorization::SignUpRequired(_) => {
502                Err(InvocationError::Deserialize("unexpected SignUpRequired after 2FA".into()))
503            }
504        }
505    }
506
507    /// Sign out and invalidate the current session.
508    pub async fn sign_out(&self) -> Result<bool, InvocationError> {
509        let req = layer_tl_types::functions::auth::LogOut {};
510        match self.rpc_call_raw(&req).await {
511            Ok(_body) => {
512                // auth.loggedOut#c3a2835f flags:# future_auth_token:flags.0?bytes = auth.LoggedOut
513                log::info!("[layer] Signed out ✓");
514                Ok(true)
515            }
516            Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
517            Err(e) => Err(e),
518        }
519    }
520
521    // ── Updates ────────────────────────────────────────────────────────────
522
523    /// Return an [`UpdateStream`] that yields incoming [`Update`]s.
524    ///
525    /// The stream must be polled regularly (e.g. in a `while let` loop) for
526    /// the client to stay connected and receive updates. Multiple streams
527    /// can be created but only one should be polled at a time.
528    pub fn stream_updates(&self) -> UpdateStream {
529        let (tx, rx) = mpsc::unbounded_channel();
530        // Subscribe this new channel to the inner broadcaster
531        // (we replace the stored sender so future updates go to this receiver)
532        // In a real production impl you'd use a broadcast channel; here we
533        // use a simple mpsc and pipe through a background task.
534        let client = self.clone();
535        tokio::spawn(async move {
536            client.run_update_loop(tx).await;
537        });
538        UpdateStream { rx }
539    }
540
541    /// Internal update loop — polls the connection for incoming data and
542    /// dispatches updates to the given sender.
543    async fn run_update_loop(&self, tx: mpsc::UnboundedSender<update::Update>) {
544        // Send periodic pings (every 60 s) while polling for updates.
545        // In this simplified model we just do a blocking recv call in a loop.
546        loop {
547            // We need to recv from the socket while NOT holding the conn lock
548            // during await. Use a timeout approach.
549            let result = {
550                let mut conn = self.inner.conn.lock().await;
551                // Set a short timeout and check for data
552                match tokio::time::timeout(
553                    Duration::from_secs(30),
554                    conn.recv_once()
555                ).await {
556                    Ok(Ok(updates)) => Ok(updates),
557                    Ok(Err(e))      => Err(e),
558                    Err(_timeout)   => {
559                        // Send a ping to keep connection alive
560                        let _ = conn.send_ping().await;
561                        continue;
562                    }
563                }
564            };
565
566            match result {
567                Ok(updates) => {
568                    for u in updates {
569                        let _ = tx.send(u);
570                    }
571                }
572                Err(e) => {
573                    log::warn!("[layer] Update loop error: {e} — reconnecting …");
574                    sleep(Duration::from_secs(1)).await;
575                    // Attempt reconnect
576                    let home_dc_id = *self.inner.home_dc_id.lock().await;
577                    let addr = {
578                        let opts = self.inner.dc_options.lock().await;
579                        opts.get(&home_dc_id).map(|e| e.addr.clone()).unwrap_or_else(|| "149.154.167.51:443".to_string())
580                    };
581                    match Connection::connect_raw(&addr).await {
582                        Ok(new_conn) => {
583                            *self.inner.conn.lock().await = new_conn;
584                            let _ = self.init_connection().await;
585                        }
586                        Err(e2) => {
587                            log::error!("[layer] Reconnect failed: {e2}");
588                            break;
589                        }
590                    }
591                }
592            }
593        }
594    }
595
596    // ── Messaging ──────────────────────────────────────────────────────────
597
598    /// Send a text message. Use `"me"` for Saved Messages.
599    pub async fn send_message(&self, peer: &str, text: &str) -> Result<(), InvocationError> {
600        let input_peer = self.resolve_peer(peer).await?;
601        self.send_message_to_peer(input_peer, text).await
602    }
603
604    /// Send a text message to an already-resolved [`layer_tl_types::enums::InputPeer`].
605    pub async fn send_message_to_peer(
606        &self,
607        peer: layer_tl_types::enums::Peer,
608        text: &str,
609    ) -> Result<(), InvocationError> {
610        let input_peer = peer_to_input_peer(peer);
611        let req = layer_tl_types::functions::messages::SendMessage {
612            no_webpage:                 false,
613            silent:                     false,
614            background:                 false,
615            clear_draft:                false,
616            noforwards:                 false,
617            update_stickersets_order:   false,
618            invert_media:               false,
619            allow_paid_floodskip:       false,
620            peer:                       input_peer,
621            reply_to:                   None,
622            message:                    text.to_string(),
623            random_id:                  random_i64(),
624            reply_markup:               None,
625            entities:                   None,
626            schedule_date:              None,
627            schedule_repeat_period:     None,
628            send_as:                    None,
629            quick_reply_shortcut:       None,
630            effect:                     None,
631            allow_paid_stars:           None,
632            suggested_post:             None,
633        };
634        self.rpc_call_raw(&req).await?;
635        Ok(())
636    }
637
638    /// Send a text message directly to "me" (Saved Messages).
639    pub async fn send_to_self(&self, text: &str) -> Result<(), InvocationError> {
640        let req = layer_tl_types::functions::messages::SendMessage {
641            no_webpage:                 false,
642            silent:                     false,
643            background:                 false,
644            clear_draft:                false,
645            noforwards:                 false,
646            update_stickersets_order:   false,
647            invert_media:               false,
648            allow_paid_floodskip:       false,
649            peer:                       layer_tl_types::enums::InputPeer::PeerSelf,
650            reply_to:                   None,
651            message:                    text.to_string(),
652            random_id:                  random_i64(),
653            reply_markup:               None,
654            entities:                   None,
655            schedule_date:              None,
656            schedule_repeat_period:     None,
657            send_as:                    None,
658            quick_reply_shortcut:       None,
659            effect:                     None,
660            allow_paid_stars:           None,
661            suggested_post:             None,
662        };
663        self.rpc_call_raw(&req).await?;
664        Ok(())
665    }
666
667    /// Delete messages by ID in a given peer.
668    pub async fn delete_messages(
669        &self,
670        message_ids: Vec<i32>,
671        revoke: bool,
672    ) -> Result<(), InvocationError> {
673        let req = layer_tl_types::functions::messages::DeleteMessages {
674            revoke,
675            id: message_ids,
676        };
677        self.rpc_call_raw(&req).await?;
678        Ok(())
679    }
680
681    // ── Callback Queries ───────────────────────────────────────────────────
682
683    /// Answer a callback query (from an inline button press).
684    ///
685    /// Pass the `query_id` from [`update::CallbackQuery::query_id`].
686    pub async fn answer_callback_query(
687        &self,
688        query_id: i64,
689        text: Option<&str>,
690        alert: bool,
691    ) -> Result<bool, InvocationError> {
692        let req = layer_tl_types::functions::messages::SetBotCallbackAnswer {
693            alert,
694            query_id,
695            message: text.map(|s| s.to_string()),
696            url:     None,
697            cache_time: 0,
698        };
699        let body = self.rpc_call_raw(&req).await?;
700        Ok(!body.is_empty())
701    }
702
703    // ── Inline Queries ─────────────────────────────────────────────────────
704
705    /// Answer an inline query.
706    ///
707    /// `results` should be a list of `tl::enums::InputBotInlineResult`.
708    pub async fn answer_inline_query(
709        &self,
710        query_id: i64,
711        results: Vec<layer_tl_types::enums::InputBotInlineResult>,
712        cache_time: i32,
713        is_personal: bool,
714        next_offset: Option<String>,
715    ) -> Result<bool, InvocationError> {
716        let req = layer_tl_types::functions::messages::SetInlineBotResults {
717            gallery:     false,
718            private:     is_personal,
719            query_id,
720            results,
721            cache_time,
722            next_offset,
723            switch_pm:   None,
724            switch_webview: None,
725        };
726        let body = self.rpc_call_raw(&req).await?;
727        Ok(!body.is_empty())
728    }
729
730    // ── Dialogs ────────────────────────────────────────────────────────────
731
732    /// Fetch up to `limit` dialogs (conversations), most recent first.
733    ///
734    /// Returns a `Vec<Dialog>`. For paginated access, call repeatedly with
735    /// offset parameters derived from the last result.
736    pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
737        let req = layer_tl_types::functions::messages::GetDialogs {
738            exclude_pinned: false,
739            folder_id:      None,
740            offset_date:    0,
741            offset_id:      0,
742            offset_peer:    layer_tl_types::enums::InputPeer::Empty,
743            limit,
744            hash:           0,
745        };
746
747        let body    = self.rpc_call_raw(&req).await?;
748        let mut cur = Cursor::from_slice(&body);
749        let dialogs = match layer_tl_types::enums::messages::Dialogs::deserialize(&mut cur)? {
750            layer_tl_types::enums::messages::Dialogs::Dialogs(d) => d,
751            layer_tl_types::enums::messages::Dialogs::Slice(d)   => {
752                layer_tl_types::types::messages::Dialogs {
753                    dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
754                }
755            }
756            layer_tl_types::enums::messages::Dialogs::NotModified(_) => {
757                return Ok(vec![]);
758            }
759        };
760
761        let result = dialogs.dialogs.into_iter().map(|d| Dialog {
762            raw:     d,
763            message: None,
764            entity:  None,
765        }).collect();
766        Ok(result)
767    }
768
769    // ── Messages ───────────────────────────────────────────────────────────
770
771    /// Fetch messages from a peer's history.
772    pub async fn get_messages(
773        &self,
774        peer: layer_tl_types::enums::InputPeer,
775        limit: i32,
776        offset_id: i32,
777    ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
778        let req = layer_tl_types::functions::messages::GetHistory {
779            peer,
780            offset_id,
781            offset_date: 0,
782            add_offset:  0,
783            limit,
784            max_id:      0,
785            min_id:      0,
786            hash:        0,
787        };
788
789        let body    = self.rpc_call_raw(&req).await?;
790        let mut cur = Cursor::from_slice(&body);
791        let messages = match layer_tl_types::enums::messages::Messages::deserialize(&mut cur)? {
792            layer_tl_types::enums::messages::Messages::Messages(m) => m.messages,
793            layer_tl_types::enums::messages::Messages::Slice(m)    => m.messages,
794            layer_tl_types::enums::messages::Messages::ChannelMessages(m) => m.messages,
795            layer_tl_types::enums::messages::Messages::NotModified(_)    => vec![],
796        };
797
798        Ok(messages.into_iter().map(update::IncomingMessage::from_raw).collect())
799    }
800
801    // ── Peer resolution ────────────────────────────────────────────────────
802
803    /// Resolve a peer string (`"me"`, `"@username"`, phone, or numeric ID)
804    /// to an [`InputPeer`](layer_tl_types::enums::InputPeer).
805    pub async fn resolve_peer(
806        &self,
807        peer: &str,
808    ) -> Result<layer_tl_types::enums::Peer, InvocationError> {
809        match peer.trim() {
810            "me" | "self" => Ok(layer_tl_types::enums::Peer::User(
811                layer_tl_types::types::PeerUser { user_id: 0 }
812            )),
813            username if username.starts_with('@') => {
814                self.resolve_username(&username[1..]).await
815            }
816            id_str => {
817                if let Ok(id) = id_str.parse::<i64>() {
818                    Ok(layer_tl_types::enums::Peer::User(
819                        layer_tl_types::types::PeerUser { user_id: id }
820                    ))
821                } else {
822                    Err(InvocationError::Deserialize(format!("cannot resolve peer: {peer}")))
823                }
824            }
825        }
826    }
827
828    async fn resolve_username(&self, username: &str) -> Result<layer_tl_types::enums::Peer, InvocationError> {
829        let req  = layer_tl_types::functions::contacts::ResolveUsername { username: username.to_string(), referer: None };
830        let body = self.rpc_call_raw(&req).await?;
831        let mut cur = Cursor::from_slice(&body);
832        let resolved = match layer_tl_types::enums::contacts::ResolvedPeer::deserialize(&mut cur)? {
833            layer_tl_types::enums::contacts::ResolvedPeer::ResolvedPeer(r) => r,
834        };
835        Ok(resolved.peer)
836    }
837
838    // ── Raw invoke ─────────────────────────────────────────────────────────
839
840    /// Invoke any TL function directly.
841    ///
842    /// Handles flood-wait and I/O retries according to the configured
843    /// [`RetryPolicy`].
844    pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
845        let body = self.rpc_call_raw(req).await?;
846        let mut cur = Cursor::from_slice(&body);
847        R::Return::deserialize(&mut cur).map_err(Into::into)
848    }
849
850    /// Invoke and return the raw response bytes (before TL deserialization).
851    async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
852        let mut fail_count   = NonZeroU32::new(1).unwrap();
853        let mut slept_so_far = Duration::default();
854
855        loop {
856            match self.do_rpc_call(req).await {
857                Ok(body) => return Ok(body),
858                Err(e) => {
859                    let ctx = RetryContext { fail_count, slept_so_far, error: e };
860                    match self.inner.retry_policy.should_retry(&ctx) {
861                        ControlFlow::Continue(delay) => {
862                            sleep(delay).await;
863                            slept_so_far += delay;
864                            fail_count = fail_count.saturating_add(1);
865                        }
866                        ControlFlow::Break(()) => return Err(ctx.error),
867                    }
868                }
869            }
870        }
871    }
872
873    async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
874        let mut conn = self.inner.conn.lock().await;
875        conn.rpc_call(req).await
876    }
877
878    // ── initConnection ─────────────────────────────────────────────────────
879
880    async fn init_connection(&self) -> Result<(), InvocationError> {
881        use layer_tl_types::functions::{InvokeWithLayer, InitConnection, help::GetConfig};
882        let req = InvokeWithLayer {
883            layer: layer_tl_types::LAYER,
884            query: InitConnection {
885                api_id:           self.inner.api_id,
886                device_model:     "Linux".to_string(),
887                system_version:   "1.0".to_string(),
888                app_version:      "0.1.0".to_string(),
889                system_lang_code: "en".to_string(),
890                lang_pack:        "".to_string(),
891                lang_code:        "en".to_string(),
892                proxy:            None,
893                params:           None,
894                query:            GetConfig {},
895            },
896        };
897
898        let body = {
899            let mut conn = self.inner.conn.lock().await;
900            conn.rpc_call_serializable(&req).await?
901        };
902
903        let mut cur = Cursor::from_slice(&body);
904        if let Ok(layer_tl_types::enums::Config::Config(cfg)) =
905            layer_tl_types::enums::Config::deserialize(&mut cur)
906        {
907            let mut opts = self.inner.dc_options.lock().await;
908            for opt in &cfg.dc_options {
909                let layer_tl_types::enums::DcOption::DcOption(o) = opt;
910                if o.media_only || o.cdn || o.tcpo_only || o.ipv6 { continue; }
911                let addr = format!("{}:{}", o.ip_address, o.port);
912                let entry = opts.entry(o.id).or_insert_with(|| DcEntry {
913                    dc_id: o.id, addr: addr.clone(),
914                    auth_key: None, first_salt: 0, time_offset: 0,
915                });
916                entry.addr = addr;
917            }
918            log::info!("[layer] initConnection ✓  ({} DCs known)", cfg.dc_options.len());
919        }
920        Ok(())
921    }
922
923    // ── DC migration ───────────────────────────────────────────────────────
924
925    async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
926        let addr = {
927            let opts = self.inner.dc_options.lock().await;
928            opts.get(&new_dc_id).map(|e| e.addr.clone())
929                .unwrap_or_else(|| "149.154.167.51:443".to_string())
930        };
931
932        log::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
933
934        let saved_key = {
935            let opts = self.inner.dc_options.lock().await;
936            opts.get(&new_dc_id).and_then(|e| e.auth_key)
937        };
938
939        let conn = if let Some(key) = saved_key {
940            Connection::connect_with_key(&addr, key, 0, 0).await?
941        } else {
942            Connection::connect_raw(&addr).await?
943        };
944
945        let new_key = conn.auth_key_bytes();
946        {
947            let mut opts = self.inner.dc_options.lock().await;
948            let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
949                dc_id: new_dc_id, addr: addr.clone(),
950                auth_key: None, first_salt: 0, time_offset: 0,
951            });
952            entry.auth_key = Some(new_key);
953        }
954
955        *self.inner.conn.lock().await = conn;
956        *self.inner.home_dc_id.lock().await = new_dc_id;
957        self.init_connection().await?;
958        log::info!("[layer] Now on DC{new_dc_id} ✓");
959        Ok(())
960    }
961
962    // ── Private helpers ────────────────────────────────────────────────────
963
964    async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
965        let body    = self.rpc_call_raw(&layer_tl_types::functions::account::GetPassword {}).await?;
966        let mut cur = Cursor::from_slice(&body);
967        let pw = match layer_tl_types::enums::account::Password::deserialize(&mut cur)? {
968            layer_tl_types::enums::account::Password::Password(p) => p,
969        };
970        Ok(PasswordToken { password: pw })
971    }
972
973    fn make_send_code_req(&self, phone: &str) -> layer_tl_types::functions::auth::SendCode {
974        layer_tl_types::functions::auth::SendCode {
975            phone_number: phone.to_string(),
976            api_id:       self.inner.api_id,
977            api_hash:     self.inner.api_hash.clone(),
978            settings:     layer_tl_types::enums::CodeSettings::CodeSettings(
979                layer_tl_types::types::CodeSettings {
980                    allow_flashcall:  false, current_number: false, allow_app_hash: false,
981                    allow_missed_call: false, allow_firebase: false, unknown_number: false,
982                    logout_tokens: None, token: None, app_sandbox: None,
983                },
984            ),
985        }
986    }
987
988    fn extract_user_name(user: &layer_tl_types::enums::User) -> String {
989        match user {
990            layer_tl_types::enums::User::User(u) => {
991                format!("{} {}", u.first_name.as_deref().unwrap_or(""),
992                                  u.last_name.as_deref().unwrap_or(""))
993                    .trim().to_string()
994            }
995            layer_tl_types::enums::User::Empty(_) => "(unknown)".into(),
996        }
997    }
998
999    fn extract_password_params(
1000        algo: &layer_tl_types::enums::PasswordKdfAlgo,
1001    ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
1002        match algo {
1003            layer_tl_types::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
1004                Ok((&a.salt1, &a.salt2, &a.p, a.g))
1005            }
1006            _ => Err(InvocationError::Deserialize("unsupported password KDF algo".into())),
1007        }
1008    }
1009}
1010
1011// ─── Connection ───────────────────────────────────────────────────────────────
1012
1013/// A single async MTProto connection to one DC.
1014struct Connection {
1015    stream:     TcpStream,
1016    enc:        EncryptedSession,
1017}
1018
1019impl Connection {
1020    async fn connect_raw(addr: &str) -> Result<Self, InvocationError> {
1021        log::info!("[layer] Connecting to {addr} (DH) …");
1022        let mut stream = TcpStream::connect(addr).await?;
1023
1024        // Send abridged init byte
1025        stream.write_all(&[0xef]).await?;
1026
1027        let mut plain = Session::new();
1028
1029        // Step 1
1030        let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1031        send_plain(&mut stream, &plain.pack(&req1).to_plaintext_bytes()).await?;
1032        let res_pq: layer_tl_types::enums::ResPq = recv_plain(&mut stream).await?;
1033
1034        // Step 2
1035        let (req2, s2) = auth::step2(s1, res_pq).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1036        send_plain(&mut stream, &plain.pack(&req2).to_plaintext_bytes()).await?;
1037        let dh: layer_tl_types::enums::ServerDhParams = recv_plain(&mut stream).await?;
1038
1039        // Step 3
1040        let (req3, s3) = auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1041        send_plain(&mut stream, &plain.pack(&req3).to_plaintext_bytes()).await?;
1042        let ans: layer_tl_types::enums::SetClientDhParamsAnswer = recv_plain(&mut stream).await?;
1043
1044        let done = auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1045        log::info!("[layer] DH complete ✓");
1046
1047        Ok(Self {
1048            stream,
1049            enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
1050        })
1051    }
1052
1053    async fn connect_with_key(
1054        addr: &str,
1055        auth_key: [u8; 256],
1056        first_salt: i64,
1057        time_offset: i32,
1058    ) -> Result<Self, InvocationError> {
1059        let mut stream = TcpStream::connect(addr).await?;
1060        stream.write_all(&[0xef]).await?;
1061        Ok(Self {
1062            stream,
1063            enc: EncryptedSession::new(auth_key, first_salt, time_offset),
1064        })
1065    }
1066
1067    fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
1068    fn first_salt(&self)     -> i64        { self.enc.salt }
1069    fn time_offset(&self)    -> i32        { self.enc.time_offset }
1070
1071    async fn rpc_call<R: RemoteCall>(&mut self, req: &R) -> Result<Vec<u8>, InvocationError> {
1072        let wire = self.enc.pack(req);
1073        send_abridged(&mut self.stream, &wire).await?;
1074        self.recv_rpc().await
1075    }
1076
1077    /// Like `rpc_call` but only requires `Serializable`, bypassing the `Deserializable`
1078    /// bound on `RemoteCall` that the code-generated `InvokeWithLayer` imposes.
1079    async fn rpc_call_serializable<S: layer_tl_types::Serializable>(&mut self, req: &S) -> Result<Vec<u8>, InvocationError> {
1080        let wire = self.enc.pack_serializable(req);
1081        send_abridged(&mut self.stream, &wire).await?;
1082        self.recv_rpc().await
1083    }
1084
1085    async fn recv_rpc(&mut self) -> Result<Vec<u8>, InvocationError> {
1086        loop {
1087            let mut raw = recv_abridged(&mut self.stream).await?;
1088            let msg = self.enc.unpack(&mut raw)
1089                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1090            if msg.salt != 0 { self.enc.salt = msg.salt; }
1091            match unwrap_envelope(msg.body)? {
1092                EnvelopeResult::Payload(p) => return Ok(p),
1093                EnvelopeResult::Updates(updates) => {
1094                    // Updates received while waiting for RPC result — buffer or discard
1095                    // In production we'd forward these to the update channel
1096                    log::debug!("[layer] {} updates received during RPC call", updates.len());
1097                }
1098                EnvelopeResult::None => {}
1099            }
1100        }
1101    }
1102
1103    /// Receive a single raw frame and parse updates from it (for the update loop).
1104    async fn recv_once(&mut self) -> Result<Vec<update::Update>, InvocationError> {
1105        let mut raw = recv_abridged(&mut self.stream).await?;
1106        let msg = self.enc.unpack(&mut raw)
1107            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1108        if msg.salt != 0 { self.enc.salt = msg.salt; }
1109        match unwrap_envelope(msg.body)? {
1110            EnvelopeResult::Updates(updates) => Ok(updates),
1111            _ => Ok(vec![]),
1112        }
1113    }
1114
1115    async fn send_ping(&mut self) -> Result<(), InvocationError> {
1116        let ping_id = random_i64();
1117        let req = layer_tl_types::functions::Ping { ping_id };
1118        let wire = self.enc.pack(&req);
1119        send_abridged(&mut self.stream, &wire).await?;
1120        Ok(())
1121    }
1122}
1123
1124// ─── Abridged transport helpers ───────────────────────────────────────────────
1125
1126async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
1127    let words = data.len() / 4;
1128    if words < 0x7f {
1129        stream.write_all(&[words as u8]).await?;
1130    } else {
1131        let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
1132        stream.write_all(&b).await?;
1133    }
1134    stream.write_all(data).await?;
1135    Ok(())
1136}
1137
1138async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
1139    let mut h = [0u8; 1];
1140    stream.read_exact(&mut h).await?;
1141    let words = if h[0] < 0x7f {
1142        h[0] as usize
1143    } else {
1144        let mut b = [0u8; 3];
1145        stream.read_exact(&mut b).await?;
1146        b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
1147    };
1148    let mut buf = vec![0u8; words * 4];
1149    stream.read_exact(&mut buf).await?;
1150    Ok(buf)
1151}
1152
1153async fn send_plain(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
1154    send_abridged(stream, data).await
1155}
1156
1157async fn recv_plain<T: Deserializable>(stream: &mut TcpStream) -> Result<T, InvocationError> {
1158    let raw = recv_abridged(stream).await?;
1159    if raw.len() < 20 { return Err(InvocationError::Deserialize("plaintext frame too short".into())); }
1160    if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
1161        return Err(InvocationError::Deserialize("expected auth_key_id=0 in plaintext".into()));
1162    }
1163    let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
1164    let mut cur  = Cursor::from_slice(&raw[20..20 + body_len]);
1165    T::deserialize(&mut cur).map_err(Into::into)
1166}
1167
1168// ─── MTProto envelope unwrapper ───────────────────────────────────────────────
1169
1170enum EnvelopeResult {
1171    Payload(Vec<u8>),
1172    Updates(Vec<update::Update>),
1173    None,
1174}
1175
1176fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
1177    if body.len() < 4 { return Err(InvocationError::Deserialize("body < 4 bytes".into())); }
1178    let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
1179
1180    match cid {
1181        ID_RPC_RESULT => {
1182            if body.len() < 12 { return Err(InvocationError::Deserialize("rpc_result too short".into())); }
1183            unwrap_envelope(body[12..].to_vec())
1184        }
1185        ID_RPC_ERROR => {
1186            if body.len() < 8 { return Err(InvocationError::Deserialize("rpc_error too short".into())); }
1187            let code    = i32::from_le_bytes(body[4..8].try_into().unwrap());
1188            let message = tl_read_string(&body[8..]).unwrap_or_default();
1189            Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
1190        }
1191        ID_MSG_CONTAINER => {
1192            if body.len() < 8 { return Err(InvocationError::Deserialize("container too short".into())); }
1193            let count   = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
1194            let mut pos = 8usize;
1195            let mut payload: Option<Vec<u8>> = None;
1196            let mut updates_buf: Vec<update::Update> = Vec::new();
1197
1198            for _ in 0..count {
1199                if pos + 16 > body.len() { break; }
1200                let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
1201                pos += 16;
1202                if pos + inner_len > body.len() { break; }
1203                let inner = body[pos..pos + inner_len].to_vec();
1204                pos += inner_len;
1205                match unwrap_envelope(inner)? {
1206                    EnvelopeResult::Payload(p)  => { payload = Some(p); }
1207                    EnvelopeResult::Updates(us) => { updates_buf.extend(us); }
1208                    EnvelopeResult::None        => {}
1209                }
1210            }
1211            if let Some(p) = payload {
1212                Ok(EnvelopeResult::Payload(p))
1213            } else if !updates_buf.is_empty() {
1214                Ok(EnvelopeResult::Updates(updates_buf))
1215            } else {
1216                Ok(EnvelopeResult::None)
1217            }
1218        }
1219        ID_GZIP_PACKED => {
1220            let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
1221            unwrap_envelope(gz_inflate(&bytes)?)
1222        }
1223        ID_PONG | ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY => {
1224            Ok(EnvelopeResult::None)
1225        }
1226        // Updates
1227        ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED | ID_UPDATE_SHORT_MSG => {
1228            parse_updates_envelope(cid, &body)
1229        }
1230        _ => Ok(EnvelopeResult::Payload(body)),
1231    }
1232}
1233
1234fn parse_updates_envelope(_cid: u32, body: &[u8]) -> Result<EnvelopeResult, InvocationError> {
1235    let updates = update::parse_updates(body);
1236    Ok(EnvelopeResult::Updates(updates))
1237}
1238
1239// ─── Helper functions ─────────────────────────────────────────────────────────
1240
1241fn peer_to_input_peer(peer: layer_tl_types::enums::Peer) -> layer_tl_types::enums::InputPeer {
1242    match peer {
1243        layer_tl_types::enums::Peer::User(u) => {
1244            if u.user_id == 0 {
1245                layer_tl_types::enums::InputPeer::PeerSelf
1246            } else {
1247                layer_tl_types::enums::InputPeer::User(
1248                    layer_tl_types::types::InputPeerUser { user_id: u.user_id, access_hash: 0 }
1249                )
1250            }
1251        }
1252        layer_tl_types::enums::Peer::Chat(c) => {
1253            layer_tl_types::enums::InputPeer::Chat(
1254                layer_tl_types::types::InputPeerChat { chat_id: c.chat_id }
1255            )
1256        }
1257        layer_tl_types::enums::Peer::Channel(c) => {
1258            layer_tl_types::enums::InputPeer::Channel(
1259                layer_tl_types::types::InputPeerChannel { channel_id: c.channel_id, access_hash: 0 }
1260            )
1261        }
1262    }
1263}
1264
1265fn random_i64() -> i64 {
1266    let mut b = [0u8; 8];
1267    getrandom::getrandom(&mut b).expect("getrandom");
1268    i64::from_le_bytes(b)
1269}
1270
1271fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
1272    if data.is_empty() { return Some(vec![]); }
1273    let (len, start) = if data[0] < 254 { (data[0] as usize, 1) }
1274    else if data.len() >= 4 { (data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16, 4) }
1275    else { return None; };
1276    if data.len() < start + len { return None; }
1277    Some(data[start..start + len].to_vec())
1278}
1279
1280fn tl_read_string(data: &[u8]) -> Option<String> {
1281    tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
1282}
1283
1284fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
1285    use std::io::Read;
1286    let mut out = Vec::new();
1287    if flate2::read::GzDecoder::new(data).read_to_end(&mut out).is_ok() && !out.is_empty() {
1288        return Ok(out);
1289    }
1290    out.clear();
1291    flate2::read::ZlibDecoder::new(data)
1292        .read_to_end(&mut out)
1293        .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
1294    Ok(out)
1295}