1#![doc(html_root_url = "https://docs.rs/layer-client/0.4.0")]
2#![deny(unsafe_code)]
22
23mod errors;
24mod retry;
25mod session;
26mod transport;
27mod two_factor_auth;
28pub mod update;
29pub mod parsers;
30pub mod media;
31pub mod participants;
32pub mod pts;
33
34pub mod dc_pool;
36pub mod transport_obfuscated;
37pub mod transport_intermediate;
38pub mod socks5;
39pub mod session_backend;
40pub mod inline_iter;
41pub mod typing_guard;
42pub mod keyboard;
43pub mod search;
44pub mod types;
45
46#[macro_use]
47pub mod macros;
48
49pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
50pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
51pub use update::Update;
52pub use media::{UploadedFile, DownloadIter, Photo, Document, Sticker, Downloadable};
53pub use participants::Participant;
54pub use typing_guard::TypingGuard;
55pub use socks5::Socks5Config;
56pub use session_backend::{SessionBackend, BinaryFileBackend, InMemoryBackend};
57pub use keyboard::{Button, InlineKeyboard, ReplyKeyboard};
58pub use search::{SearchBuilder, GlobalSearchBuilder};
59pub use types::{User, Group, Channel, Chat};
60
61pub use layer_tl_types as tl;
64
65use std::collections::HashMap;
66use std::collections::VecDeque;
67use std::num::NonZeroU32;
68use std::ops::ControlFlow;
69use std::sync::Arc;
70use std::time::Duration;
71
72use layer_mtproto::{EncryptedSession, Session, authentication as auth};
73use layer_tl_types::{Cursor, Deserializable, RemoteCall};
74use session::{DcEntry, PersistedSession};
75use tokio::io::{AsyncReadExt, AsyncWriteExt};
76use tokio::net::TcpStream;
77use tokio::sync::{mpsc, oneshot, Mutex};
78use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
79use tokio::time::sleep;
80use tokio_util::sync::CancellationToken;
81use socket2::TcpKeepalive;
82
83const ID_RPC_RESULT: u32 = 0xf35c6d01;
86const ID_RPC_ERROR: u32 = 0x2144ca19;
87const ID_MSG_CONTAINER: u32 = 0x73f1f8dc;
88const ID_GZIP_PACKED: u32 = 0x3072cfa1;
89const ID_PONG: u32 = 0x347773c5;
90const ID_MSGS_ACK: u32 = 0x62d6b459;
91const ID_BAD_SERVER_SALT: u32 = 0xedab447b;
92const ID_NEW_SESSION: u32 = 0x9ec20908;
93const ID_BAD_MSG_NOTIFY: u32 = 0xa7eff811;
94const ID_FUTURE_SALTS: u32 = 0xae500895;
96const ID_MSG_DETAILED_INFO: u32 = 0x276d3ec6;
98const ID_MSG_NEW_DETAIL_INFO: u32 = 0x809db6df;
99const ID_MSG_RESEND_REQ: u32 = 0x7d861a08;
101const ID_UPDATES: u32 = 0x74ae4240;
102const ID_UPDATE_SHORT: u32 = 0x78d4dec1;
103const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
104const ID_UPDATE_SHORT_MSG: u32 = 0x313bc7f8;
105const ID_UPDATE_SHORT_CHAT_MSG: u32 = 0x4d6deea5;
106const ID_UPDATES_TOO_LONG: u32 = 0xe317af7e;
107
108const PING_DELAY_SECS: u64 = 15;
114
115const NO_PING_DISCONNECT: i32 = 20;
118
119const RECONNECT_BASE_MS: u64 = 500;
121
122const RECONNECT_MAX_SECS: u64 = 5;
127
128const TCP_KEEPALIVE_IDLE_SECS: u64 = 10;
130const TCP_KEEPALIVE_INTERVAL_SECS: u64 = 5;
132const TCP_KEEPALIVE_PROBES: u32 = 3;
134
135#[derive(Default)]
143pub struct PeerCache {
144 pub users: HashMap<i64, i64>,
146 pub channels: HashMap<i64, i64>,
148}
149
150impl PeerCache {
151 fn cache_user(&mut self, user: &tl::enums::User) {
152 if let tl::enums::User::User(u) = user
153 && let Some(hash) = u.access_hash {
154 self.users.insert(u.id, hash);
155 }
156 }
157
158 fn cache_chat(&mut self, chat: &tl::enums::Chat) {
159 match chat {
160 tl::enums::Chat::Channel(c) => {
161 if let Some(hash) = c.access_hash {
162 self.channels.insert(c.id, hash);
163 }
164 }
165 tl::enums::Chat::ChannelForbidden(c) => {
166 self.channels.insert(c.id, c.access_hash);
167 }
168 _ => {}
169 }
170 }
171
172 fn cache_users(&mut self, users: &[tl::enums::User]) {
173 for u in users { self.cache_user(u); }
174 }
175
176 fn cache_chats(&mut self, chats: &[tl::enums::Chat]) {
177 for c in chats { self.cache_chat(c); }
178 }
179
180 fn user_input_peer(&self, user_id: i64) -> tl::enums::InputPeer {
181 if user_id == 0 {
182 return tl::enums::InputPeer::PeerSelf;
183 }
184 let hash = self.users.get(&user_id).copied().unwrap_or(0);
185 tl::enums::InputPeer::User(tl::types::InputPeerUser { user_id, access_hash: hash })
186 }
187
188 fn channel_input_peer(&self, channel_id: i64) -> tl::enums::InputPeer {
189 let hash = self.channels.get(&channel_id).copied().unwrap_or(0);
190 tl::enums::InputPeer::Channel(tl::types::InputPeerChannel { channel_id, access_hash: hash })
191 }
192
193 fn peer_to_input(&self, peer: &tl::enums::Peer) -> tl::enums::InputPeer {
194 match peer {
195 tl::enums::Peer::User(u) => self.user_input_peer(u.user_id),
196 tl::enums::Peer::Chat(c) => tl::enums::InputPeer::Chat(
197 tl::types::InputPeerChat { chat_id: c.chat_id }
198 ),
199 tl::enums::Peer::Channel(c) => self.channel_input_peer(c.channel_id),
200 }
201 }
202}
203
204#[derive(Clone, Default)]
216pub struct InputMessage {
217 pub text: String,
218 pub reply_to: Option<i32>,
219 pub silent: bool,
220 pub background: bool,
221 pub clear_draft: bool,
222 pub no_webpage: bool,
223 pub invert_media: bool,
225 pub schedule_once_online: bool,
227 pub entities: Option<Vec<tl::enums::MessageEntity>>,
228 pub reply_markup: Option<tl::enums::ReplyMarkup>,
229 pub schedule_date: Option<i32>,
230 pub media: Option<tl::enums::InputMedia>,
233}
234
235impl InputMessage {
236 pub fn text(text: impl Into<String>) -> Self {
238 Self { text: text.into(), ..Default::default() }
239 }
240
241 pub fn set_text(mut self, text: impl Into<String>) -> Self {
243 self.text = text.into(); self
244 }
245
246 pub fn reply_to(mut self, id: Option<i32>) -> Self {
248 self.reply_to = id; self
249 }
250
251 pub fn silent(mut self, v: bool) -> Self {
253 self.silent = v; self
254 }
255
256 pub fn background(mut self, v: bool) -> Self {
258 self.background = v; self
259 }
260
261 pub fn clear_draft(mut self, v: bool) -> Self {
263 self.clear_draft = v; self
264 }
265
266 pub fn no_webpage(mut self, v: bool) -> Self {
268 self.no_webpage = v; self
269 }
270
271 pub fn invert_media(mut self, v: bool) -> Self {
273 self.invert_media = v; self
274 }
275
276 pub fn schedule_once_online(mut self) -> Self {
281 self.schedule_once_online = true;
282 self.schedule_date = None;
283 self
284 }
285
286 pub fn entities(mut self, e: Vec<tl::enums::MessageEntity>) -> Self {
288 self.entities = Some(e); self
289 }
290
291 pub fn reply_markup(mut self, rm: tl::enums::ReplyMarkup) -> Self {
293 self.reply_markup = Some(rm); self
294 }
295
296 pub fn keyboard(mut self, kb: impl Into<tl::enums::ReplyMarkup>) -> Self {
306 self.reply_markup = Some(kb.into()); self
307 }
308
309 pub fn schedule_date(mut self, ts: Option<i32>) -> Self {
311 self.schedule_date = ts; self
312 }
313
314 pub fn copy_media(mut self, media: tl::enums::InputMedia) -> Self {
330 self.media = Some(media); self
331 }
332
333 pub fn clear_media(mut self) -> Self {
335 self.media = None; self
336 }
337
338 fn reply_header(&self) -> Option<tl::enums::InputReplyTo> {
339 self.reply_to.map(|id| {
340 tl::enums::InputReplyTo::Message(
341 tl::types::InputReplyToMessage {
342 reply_to_msg_id: id,
343 top_msg_id: None,
344 reply_to_peer_id: None,
345 quote_text: None,
346 quote_entities: None,
347 quote_offset: None,
348 monoforum_peer_id: None,
349 todo_item_id: None,
350 poll_option: None,
351 }
352 )
353 })
354 }
355}
356
357impl From<&str> for InputMessage {
358 fn from(s: &str) -> Self { Self::text(s) }
359}
360
361impl From<String> for InputMessage {
362 fn from(s: String) -> Self { Self::text(s) }
363}
364
365#[derive(Clone, Debug, Default)]
376pub enum TransportKind {
377 #[default]
381 Abridged,
382 Intermediate,
386 Full,
390 Obfuscated { secret: Option<[u8; 16]> },
397}
398
399pub type ShutdownToken = CancellationToken;
419
420#[derive(Clone)]
422pub struct Config {
423 pub api_id: i32,
424 pub api_hash: String,
425 pub dc_addr: Option<String>,
426 pub retry_policy: Arc<dyn RetryPolicy>,
427 pub socks5: Option<crate::socks5::Socks5Config>,
429 pub allow_ipv6: bool,
431 pub transport: TransportKind,
433 pub session_backend: Arc<dyn crate::session_backend::SessionBackend>,
435 pub catch_up: bool,
439}
440
441impl Default for Config {
442 fn default() -> Self {
443 Self {
444 api_id: 0,
445 api_hash: String::new(),
446 dc_addr: None,
447 retry_policy: Arc::new(AutoSleep::default()),
448 socks5: None,
449 allow_ipv6: false,
450 transport: TransportKind::Abridged,
451 session_backend: Arc::new(crate::session_backend::BinaryFileBackend::new("layer.session")),
452 catch_up: false,
453 }
454 }
455}
456
457pub struct UpdateStream {
462 rx: mpsc::UnboundedReceiver<update::Update>,
463}
464
465impl UpdateStream {
466 pub async fn next(&mut self) -> Option<update::Update> {
468 self.rx.recv().await
469 }
470
471 pub async fn next_raw(&mut self) -> Option<update::RawUpdate> {
477 loop {
478 match self.rx.recv().await? {
479 update::Update::Raw(r) => return Some(r),
480 _ => continue,
481 }
482 }
483 }
484}
485
486#[derive(Debug, Clone)]
490pub struct Dialog {
491 pub raw: tl::enums::Dialog,
492 pub message: Option<tl::enums::Message>,
493 pub entity: Option<tl::enums::User>,
494 pub chat: Option<tl::enums::Chat>,
495}
496
497impl Dialog {
498 pub fn title(&self) -> String {
500 if let Some(tl::enums::User::User(u)) = &self.entity {
501 let first = u.first_name.as_deref().unwrap_or("");
502 let last = u.last_name.as_deref().unwrap_or("");
503 let name = format!("{first} {last}").trim().to_string();
504 if !name.is_empty() { return name; }
505 }
506 if let Some(chat) = &self.chat {
507 return match chat {
508 tl::enums::Chat::Chat(c) => c.title.clone(),
509 tl::enums::Chat::Forbidden(c) => c.title.clone(),
510 tl::enums::Chat::Channel(c) => c.title.clone(),
511 tl::enums::Chat::ChannelForbidden(c) => c.title.clone(),
512 tl::enums::Chat::Empty(_) => "(empty)".into(),
513 };
514 }
515 "(Unknown)".to_string()
516 }
517
518 pub fn peer(&self) -> Option<&tl::enums::Peer> {
520 match &self.raw {
521 tl::enums::Dialog::Dialog(d) => Some(&d.peer),
522 tl::enums::Dialog::Folder(_) => None,
523 }
524 }
525
526 pub fn unread_count(&self) -> i32 {
528 match &self.raw {
529 tl::enums::Dialog::Dialog(d) => d.unread_count,
530 _ => 0,
531 }
532 }
533
534 pub fn top_message(&self) -> i32 {
536 match &self.raw {
537 tl::enums::Dialog::Dialog(d) => d.top_message,
538 _ => 0,
539 }
540 }
541}
542
543struct ClientInner {
546 writer: Mutex<ConnectionWriter>,
550 #[allow(clippy::type_complexity)]
554 pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>>,
555 reconnect_tx: mpsc::UnboundedSender<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
558 network_hint_tx: mpsc::UnboundedSender<()>,
561 #[allow(dead_code)]
563 shutdown_token: CancellationToken,
564 #[allow(dead_code)]
566 catch_up: bool,
567 home_dc_id: Mutex<i32>,
568 dc_options: Mutex<HashMap<i32, DcEntry>>,
569 pub peer_cache: Mutex<PeerCache>,
570 pub pts_state: Mutex<pts::PtsState>,
571 pub possible_gap: Mutex<pts::PossibleGapBuffer>,
573 api_id: i32,
574 api_hash: String,
575 retry_policy: Arc<dyn RetryPolicy>,
576 socks5: Option<crate::socks5::Socks5Config>,
577 allow_ipv6: bool,
578 transport: TransportKind,
579 session_backend: Arc<dyn crate::session_backend::SessionBackend>,
580 dc_pool: Mutex<dc_pool::DcPool>,
581 update_tx: mpsc::UnboundedSender<update::Update>,
582}
583
584#[derive(Clone)]
586pub struct Client {
587 pub(crate) inner: Arc<ClientInner>,
588 _update_rx: Arc<Mutex<mpsc::UnboundedReceiver<update::Update>>>,
589}
590
591impl Client {
592 pub async fn connect(config: Config) -> Result<(Self, ShutdownToken), InvocationError> {
595 let (update_tx, update_rx) = mpsc::unbounded_channel();
596
597 let socks5 = config.socks5.clone();
599 let transport = config.transport.clone();
600
601 let (conn, home_dc_id, dc_opts, loaded_session) =
602 match config.session_backend.load()
603 .map_err(InvocationError::Io)?
604 {
605 Some(s) => {
606 if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
607 if let Some(key) = dc.auth_key {
608 log::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
609 match Connection::connect_with_key(
610 &dc.addr, key, dc.first_salt, dc.time_offset,
611 socks5.as_ref(), &transport,
612 ).await {
613 Ok(c) => {
614 let mut opts = session::default_dc_addresses()
615 .into_iter()
616 .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
617 .collect::<HashMap<_, _>>();
618 for d in &s.dcs { opts.insert(d.dc_id, d.clone()); }
619 (c, s.home_dc_id, opts, Some(s))
620 }
621 Err(e) => {
622 log::warn!("[layer] Session connect failed ({e}), fresh connect …");
623 let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
624 (c, dc, opts, None)
625 }
626 }
627 } else {
628 let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
629 (c, dc, opts, None)
630 }
631 } else {
632 let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
633 (c, dc, opts, None)
634 }
635 }
636 None => {
637 let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
638 (c, dc, opts, None)
639 }
640 };
641
642 let pool = dc_pool::DcPool::new(home_dc_id, &dc_opts.values().cloned().collect::<Vec<_>>());
644
645 let (writer, read_half, frame_kind) = conn.into_writer();
650 let auth_key = writer.enc.auth_key_bytes();
651 let session_id = writer.enc.session_id();
652
653 #[allow(clippy::type_complexity)]
654 let pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>> =
655 Arc::new(Mutex::new(HashMap::new()));
656
657 let (reconnect_tx, reconnect_rx) =
659 mpsc::unbounded_channel::<(OwnedReadHalf, FrameKind, [u8; 256], i64)>();
660
661 let (network_hint_tx, network_hint_rx) = mpsc::unbounded_channel::<()>();
664
665
666 let shutdown_token = CancellationToken::new();
668 let catch_up = config.catch_up;
669
670 let inner = Arc::new(ClientInner {
671 writer: Mutex::new(writer),
672 pending: pending.clone(),
673 reconnect_tx,
674 network_hint_tx,
675 shutdown_token: shutdown_token.clone(),
676 catch_up,
677 home_dc_id: Mutex::new(home_dc_id),
678 dc_options: Mutex::new(dc_opts),
679 peer_cache: Mutex::new(PeerCache::default()),
680 pts_state: Mutex::new(pts::PtsState::default()),
681 possible_gap: Mutex::new(pts::PossibleGapBuffer::new()),
682 api_id: config.api_id,
683 api_hash: config.api_hash,
684 retry_policy: config.retry_policy,
685 socks5: config.socks5,
686 allow_ipv6: config.allow_ipv6,
687 transport: config.transport,
688 session_backend: config.session_backend,
689 dc_pool: Mutex::new(pool),
690 update_tx,
691 });
692
693 let client = Self {
694 inner,
695 _update_rx: Arc::new(Mutex::new(update_rx)),
696 };
697
698 {
701 let client_r = client.clone();
702 let shutdown_r = shutdown_token.clone();
703 tokio::spawn(async move {
704 client_r.run_reader_task(
705 read_half, frame_kind, auth_key, session_id,
706 reconnect_rx, network_hint_rx, shutdown_r,
707 ).await;
708 });
709 }
710
711 if let Err(e) = client.init_connection().await {
714 log::warn!("[layer] init_connection failed ({e}), retrying with fresh connect …");
715
716 let socks5_r = client.inner.socks5.clone();
717 let transport_r = client.inner.transport.clone();
718 let (new_conn, new_dc_id, new_opts) =
719 Self::fresh_connect(socks5_r.as_ref(), &transport_r).await?;
720
721 {
722 let mut dc_guard = client.inner.home_dc_id.lock().await;
723 *dc_guard = new_dc_id;
724 }
725 {
726 let mut opts_guard = client.inner.dc_options.lock().await;
727 *opts_guard = new_opts;
728 }
729
730 let (new_writer, new_read, new_fk) = new_conn.into_writer();
732 let new_ak = new_writer.enc.auth_key_bytes();
733 let new_sid = new_writer.enc.session_id();
734 *client.inner.writer.lock().await = new_writer;
735 let _ = client.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
736
737 client.init_connection().await?;
738 }
739
740 if let Some(ref s) = loaded_session
742 && !s.peers.is_empty() {
743 let mut cache = client.inner.peer_cache.lock().await;
744 for p in &s.peers {
745 if p.is_channel {
746 cache.channels.entry(p.id).or_insert(p.access_hash);
747 } else {
748 cache.users.entry(p.id).or_insert(p.access_hash);
749 }
750 }
751 log::debug!("[layer] Peer cache restored: {} users, {} channels",
752 cache.users.len(), cache.channels.len());
753 }
754
755 let has_saved_state = loaded_session
765 .as_ref()
766 .is_some_and(|s| s.updates_state.is_initialised());
767
768 if catch_up && has_saved_state {
769 let snap = &loaded_session.as_ref().unwrap().updates_state;
770 let mut state = client.inner.pts_state.lock().await;
771 state.pts = snap.pts;
772 state.qts = snap.qts;
773 state.date = snap.date;
774 state.seq = snap.seq;
775 for &(cid, cpts) in &snap.channels {
776 state.channel_pts.insert(cid, cpts);
777 }
778 log::info!("[layer] Update state restored: pts={}, qts={}, seq={}, {} channels",
779 state.pts, state.qts, state.seq, state.channel_pts.len());
780 drop(state);
781
782 let c = client.clone();
785 let utx = client.inner.update_tx.clone();
786 tokio::spawn(async move {
787 if let Ok(missed) = c.get_difference().await {
788 log::info!("[layer] catch_up: {} missed updates replayed", missed.len());
789 for u in missed { let _ = utx.send(u); }
790 }
791 });
792 } else {
793 let _ = client.sync_pts_state().await;
795 }
796
797 Ok((client, shutdown_token))
798 }
799
800 async fn fresh_connect(
801 socks5: Option<&crate::socks5::Socks5Config>,
802 transport: &TransportKind,
803 ) -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
804 log::info!("[layer] Fresh connect to DC2 …");
805 let conn = Connection::connect_raw("149.154.167.51:443", socks5, transport).await?;
806 let opts = session::default_dc_addresses()
807 .into_iter()
808 .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
809 .collect();
810 Ok((conn, 2, opts))
811 }
812
813 pub async fn save_session(&self) -> Result<(), InvocationError> {
816 use session::{CachedPeer, UpdatesStateSnap};
817
818 let writer_guard = self.inner.writer.lock().await;
819 let home_dc_id = *self.inner.home_dc_id.lock().await;
820 let dc_options = self.inner.dc_options.lock().await;
821
822 let mut dcs: Vec<DcEntry> = dc_options.values().map(|e| DcEntry {
823 dc_id: e.dc_id,
824 addr: e.addr.clone(),
825 auth_key: if e.dc_id == home_dc_id { Some(writer_guard.auth_key_bytes()) } else { e.auth_key },
826 first_salt: if e.dc_id == home_dc_id { writer_guard.first_salt() } else { e.first_salt },
827 time_offset: if e.dc_id == home_dc_id { writer_guard.time_offset() } else { e.time_offset },
828 }).collect();
829 self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
830
831 let pts_snap = {
833 let s = self.inner.pts_state.lock().await;
834 UpdatesStateSnap {
835 pts: s.pts,
836 qts: s.qts,
837 date: s.date,
838 seq: s.seq,
839 channels: s.channel_pts.iter().map(|(&k, &v)| (k, v)).collect(),
840 }
841 };
842
843 let peers: Vec<CachedPeer> = {
845 let cache = self.inner.peer_cache.lock().await;
846 let mut v = Vec::with_capacity(cache.users.len() + cache.channels.len());
847 for (&id, &hash) in &cache.users { v.push(CachedPeer { id, access_hash: hash, is_channel: false }); }
848 for (&id, &hash) in &cache.channels { v.push(CachedPeer { id, access_hash: hash, is_channel: true }); }
849 v
850 };
851
852 self.inner.session_backend
853 .save(&PersistedSession { home_dc_id, dcs, updates_state: pts_snap, peers })
854 .map_err(InvocationError::Io)?;
855 log::info!("[layer] Session saved ✓");
856 Ok(())
857 }
858
859 pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
863 match self.invoke(&tl::functions::updates::GetState {}).await {
864 Ok(_) => Ok(true),
865 Err(e) if e.is("AUTH_KEY_UNREGISTERED")
866 || matches!(&e, InvocationError::Rpc(r) if r.code == 401) => Ok(false),
867 Err(e) => Err(e),
868 }
869 }
870
871 pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
873 let req = tl::functions::auth::ImportBotAuthorization {
874 flags: 0, api_id: self.inner.api_id,
875 api_hash: self.inner.api_hash.clone(),
876 bot_auth_token: token.to_string(),
877 };
878
879 let result = match self.invoke(&req).await {
880 Ok(x) => x,
881 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
882 let dc_id = r.value.unwrap_or(2) as i32;
883 self.migrate_to(dc_id).await?;
884 self.invoke(&req).await?
885 }
886 Err(e) => return Err(e),
887 };
888
889 let name = match result {
890 tl::enums::auth::Authorization::Authorization(a) => {
891 self.cache_user(&a.user).await;
892 Self::extract_user_name(&a.user)
893 }
894 tl::enums::auth::Authorization::SignUpRequired(_) => {
895 panic!("unexpected SignUpRequired during bot sign-in")
896 }
897 };
898 log::info!("[layer] Bot signed in ✓ ({name})");
899 Ok(name)
900 }
901
902 pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
904 use tl::enums::auth::SentCode;
905
906 let req = self.make_send_code_req(phone);
907 let body = match self.rpc_call_raw(&req).await {
908 Ok(b) => b,
909 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
910 let dc_id = r.value.unwrap_or(2) as i32;
911 self.migrate_to(dc_id).await?;
912 self.rpc_call_raw(&req).await?
913 }
914 Err(e) => return Err(e),
915 };
916
917 let mut cur = Cursor::from_slice(&body);
918 let hash = match tl::enums::auth::SentCode::deserialize(&mut cur)? {
919 SentCode::SentCode(s) => s.phone_code_hash,
920 SentCode::Success(_) => return Err(InvocationError::Deserialize("unexpected Success".into())),
921 SentCode::PaymentRequired(_) => return Err(InvocationError::Deserialize("payment required to send code".into())),
922 };
923 log::info!("[layer] Login code sent");
924 Ok(LoginToken { phone: phone.to_string(), phone_code_hash: hash })
925 }
926
927 pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
929 let req = tl::functions::auth::SignIn {
930 phone_number: token.phone.clone(),
931 phone_code_hash: token.phone_code_hash.clone(),
932 phone_code: Some(code.trim().to_string()),
933 email_verification: None,
934 };
935
936 let body = match self.rpc_call_raw(&req).await {
937 Ok(b) => b,
938 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
939 let dc_id = r.value.unwrap_or(2) as i32;
940 self.migrate_to(dc_id).await.map_err(SignInError::Other)?;
941 self.rpc_call_raw(&req).await.map_err(SignInError::Other)?
942 }
943 Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
944 let t = self.get_password_info().await.map_err(SignInError::Other)?;
945 return Err(SignInError::PasswordRequired(Box::new(t)));
946 }
947 Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
948 Err(e) => return Err(SignInError::Other(e)),
949 };
950
951 let mut cur = Cursor::from_slice(&body);
952 match tl::enums::auth::Authorization::deserialize(&mut cur)
953 .map_err(|e| SignInError::Other(e.into()))?
954 {
955 tl::enums::auth::Authorization::Authorization(a) => {
956 self.cache_user(&a.user).await;
957 let name = Self::extract_user_name(&a.user);
958 log::info!("[layer] Signed in ✓ Welcome, {name}!");
959 Ok(name)
960 }
961 tl::enums::auth::Authorization::SignUpRequired(_) => Err(SignInError::SignUpRequired),
962 }
963 }
964
965 pub async fn check_password(
967 &self,
968 token: PasswordToken,
969 password: impl AsRef<[u8]>,
970 ) -> Result<String, InvocationError> {
971 let pw = token.password;
972 let algo = pw.current_algo.ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
973 let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
974 let g_b = pw.srp_b.ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
975 let a = pw.secure_random;
976 let srp_id = pw.srp_id.ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
977
978 let (m1, g_a) = two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
979 let req = tl::functions::auth::CheckPassword {
980 password: tl::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
981 tl::types::InputCheckPasswordSrp {
982 srp_id, a: g_a.to_vec(), m1: m1.to_vec(),
983 },
984 ),
985 };
986
987 let body = self.rpc_call_raw(&req).await?;
988 let mut cur = Cursor::from_slice(&body);
989 match tl::enums::auth::Authorization::deserialize(&mut cur)? {
990 tl::enums::auth::Authorization::Authorization(a) => {
991 self.cache_user(&a.user).await;
992 let name = Self::extract_user_name(&a.user);
993 log::info!("[layer] 2FA ✓ Welcome, {name}!");
994 Ok(name)
995 }
996 tl::enums::auth::Authorization::SignUpRequired(_) =>
997 Err(InvocationError::Deserialize("unexpected SignUpRequired after 2FA".into())),
998 }
999 }
1000
1001 pub async fn sign_out(&self) -> Result<bool, InvocationError> {
1003 let req = tl::functions::auth::LogOut {};
1004 match self.rpc_call_raw(&req).await {
1005 Ok(_) => { log::info!("[layer] Signed out ✓"); Ok(true) }
1006 Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
1007 Err(e) => Err(e),
1008 }
1009 }
1010
1011 pub async fn get_me(&self) -> Result<tl::types::User, InvocationError> {
1015 let req = tl::functions::users::GetUsers {
1016 id: vec![tl::enums::InputUser::UserSelf],
1017 };
1018 let body = self.rpc_call_raw(&req).await?;
1019 let mut cur = Cursor::from_slice(&body);
1020 let users = Vec::<tl::enums::User>::deserialize(&mut cur)?;
1021 self.cache_users_slice(&users).await;
1022 users.into_iter().find_map(|u| match u {
1023 tl::enums::User::User(u) => Some(u),
1024 _ => None,
1025 }).ok_or_else(|| InvocationError::Deserialize("getUsers returned no user".into()))
1026 }
1027
1028 pub fn stream_updates(&self) -> UpdateStream {
1036 let (caller_tx, rx) = mpsc::unbounded_channel::<update::Update>();
1037 let internal_rx = self._update_rx.clone();
1040 tokio::spawn(async move {
1041 let mut guard = internal_rx.lock().await;
1043 while let Some(upd) = guard.recv().await {
1044 if caller_tx.send(upd).is_err() { break; }
1045 }
1046 });
1047 UpdateStream { rx }
1048 }
1049
1050 pub fn signal_network_restored(&self) {
1063 let _ = self.inner.network_hint_tx.send(());
1064 }
1065
1066 #[allow(clippy::too_many_arguments)]
1100 async fn run_reader_task(
1101 &self,
1102 read_half: OwnedReadHalf,
1103 frame_kind: FrameKind,
1104 auth_key: [u8; 256],
1105 session_id: i64,
1106 mut new_conn_rx: mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1107 mut network_hint_rx: mpsc::UnboundedReceiver<()>,
1108 shutdown_token: CancellationToken,
1109 ) {
1110 let mut rh = read_half;
1111 let mut fk = frame_kind;
1112 let mut ak = auth_key;
1113 let mut sid = session_id;
1114 let mut restart_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = None;
1117 let mut restart_count: u32 = 0;
1118
1119 loop {
1120 tokio::select! {
1121 _ = shutdown_token.cancelled() => {
1123 log::info!("[layer] Reader task: shutdown requested, exiting cleanly.");
1124 let mut pending = self.inner.pending.lock().await;
1125 for (_, tx) in pending.drain() {
1126 let _ = tx.send(Err(InvocationError::Dropped));
1127 }
1128 return;
1129 }
1130
1131 _ = self.reader_loop(
1133 rh, fk, ak, sid,
1134 restart_init_rx.take(),
1135 &mut new_conn_rx, &mut network_hint_rx,
1136 ) => {}
1137 }
1138
1139 if shutdown_token.is_cancelled() {
1142 log::info!("[layer] Reader task: exiting after loop (shutdown).");
1143 return;
1144 }
1145
1146 restart_count += 1;
1147 log::error!(
1148 "[layer] Reader loop exited unexpectedly (restart #{restart_count}) — supervisor reconnecting …"
1149 );
1150
1151 {
1153 let mut pending = self.inner.pending.lock().await;
1154 for (_, tx) in pending.drain() {
1155 let _ = tx.send(Err(InvocationError::Io(std::io::Error::new(
1156 std::io::ErrorKind::ConnectionReset,
1157 "reader task restarted",
1158 ))));
1159 }
1160 }
1161
1162 let mut delay_ms = RECONNECT_BASE_MS;
1164 let new_conn = loop {
1165 log::info!("[layer] Supervisor: reconnecting in {delay_ms} ms …");
1166 tokio::select! {
1167 _ = shutdown_token.cancelled() => {
1168 log::info!("[layer] Supervisor: shutdown during reconnect, exiting.");
1169 return;
1170 }
1171 _ = sleep(Duration::from_millis(delay_ms)) => {}
1172 }
1173
1174 let dummy_ak = [0u8; 256];
1179 let dummy_fk = FrameKind::Abridged;
1180 match self.do_reconnect(&dummy_ak, &dummy_fk).await {
1181 Ok(conn) => break conn,
1182 Err(e) => {
1183 log::warn!("[layer] Supervisor: reconnect failed ({e})");
1184 let next = (delay_ms * 2).min(RECONNECT_MAX_SECS * 1_000);
1185 delay_ms = jitter_delay(next).as_millis() as u64;
1186 }
1187 }
1188 };
1189
1190 let (new_rh, new_fk, new_ak, new_sid) = new_conn;
1191 rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
1192
1193 let (init_tx, init_rx) = oneshot::channel();
1196 let c = self.clone();
1197 let utx = self.inner.update_tx.clone();
1198 tokio::spawn(async move {
1199 let result = loop {
1201 match c.init_connection().await {
1202 Ok(()) => break Ok(()),
1203 Err(InvocationError::Rpc(ref r))
1204 if r.flood_wait_seconds().is_some() =>
1205 {
1206 let secs = r.flood_wait_seconds().unwrap();
1207 log::warn!(
1208 "[layer] Supervisor init_connection FLOOD_WAIT_{secs} — waiting"
1209 );
1210 sleep(Duration::from_secs(secs + 1)).await;
1211 }
1212 Err(e) => break Err(e),
1213 }
1214 };
1215 if result.is_ok()
1216 && let Ok(missed) = c.get_difference().await {
1217 for u in missed { let _ = utx.send(u); }
1218 }
1219 let _ = init_tx.send(result);
1220 });
1221 restart_init_rx = Some(init_rx);
1222
1223 log::info!("[layer] Supervisor: restarting reader loop (restart #{restart_count}) …");
1224 }
1226 }
1227
1228 #[allow(clippy::too_many_arguments)]
1229 async fn reader_loop(
1230 &self,
1231 mut rh: OwnedReadHalf,
1232 mut fk: FrameKind,
1233 mut ak: [u8; 256],
1234 mut sid: i64,
1235 initial_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>>,
1238 new_conn_rx: &mut mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1239 network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
1240 ) {
1241 let mut init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = initial_init_rx;
1246 let mut init_fail_count: u32 = 0;
1251
1252 loop {
1253 tokio::select! {
1254 outcome = recv_frame_with_keepalive(&mut rh, &fk, self, &ak) => {
1256 match outcome {
1257 FrameOutcome::Frame(mut raw) => {
1258 let msg = match EncryptedSession::decrypt_frame(&ak, sid, &mut raw) {
1259 Ok(m) => m,
1260 Err(e) => { log::warn!("[layer] Decrypt error: {e:?}"); continue; }
1261 };
1262 if msg.salt != 0 {
1263 self.inner.writer.lock().await.enc.salt = msg.salt;
1264 }
1265 self.route_frame(msg.body, msg.msg_id).await;
1266
1267 let acks_to_send = {
1271 let mut w = self.inner.writer.lock().await;
1272 let v: Vec<i64> = w.pending_ack.drain(..).collect();
1273 v
1274 };
1275 if !acks_to_send.is_empty() {
1276 let mut w = self.inner.writer.lock().await;
1277 let fk = w.frame_kind.clone();
1278 let ack_body = build_msgs_ack_body(&acks_to_send);
1279 let (wire, _) = w.enc.pack_body_with_msg_id(&ack_body, false);
1280 send_frame_write(&mut w.write_half, &wire, &fk).await.ok();
1281 log::trace!("[layer] G-04 flushed {} acks", acks_to_send.len());
1282 }
1283 }
1284
1285 FrameOutcome::Error(e) => {
1286 log::warn!("[layer] Reader: connection error: {e}");
1287 drop(init_rx.take()); {
1292 let mut pending = self.inner.pending.lock().await;
1293 let msg = e.to_string();
1294 for (_, tx) in pending.drain() {
1295 let _ = tx.send(Err(InvocationError::Io(
1296 std::io::Error::new(
1297 std::io::ErrorKind::ConnectionReset, msg.clone()))));
1298 }
1299 }
1300
1301 match self.do_reconnect_loop(
1302 RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
1303 network_hint_rx,
1304 ).await {
1305 Some(rx) => { init_rx = Some(rx); }
1306 None => return, }
1308 }
1309
1310 FrameOutcome::Keepalive => {} }
1312 }
1313
1314 maybe = new_conn_rx.recv() => {
1316 if let Some((new_rh, new_fk, new_ak, new_sid)) = maybe {
1317 rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
1318 log::info!("[layer] Reader: switched to new connection.");
1319 } else {
1320 break; }
1322 }
1323
1324
1325 init_result = async { init_rx.as_mut().unwrap().await }, if init_rx.is_some() => {
1327 init_rx = None;
1328 match init_result {
1329 Ok(Ok(())) => {
1330 init_fail_count = 0;
1331 log::info!("[layer] Reconnected to Telegram ✓ — session live, replaying missed updates …");
1336 for attempt in 1u8..=3 {
1337 match self.save_session().await {
1338 Ok(()) => break,
1339 Err(e) if attempt < 3 => {
1340 log::warn!(
1341 "[layer] save_session failed (attempt {attempt}/3): {e}"
1342 );
1343 sleep(Duration::from_millis(500)).await;
1344 }
1345 Err(e) => {
1346 log::error!(
1347 "[layer] save_session permanently failed after 3 attempts: {e}"
1348 );
1349 }
1350 }
1351 }
1352 }
1353
1354 Ok(Err(e)) => {
1355 let key_is_stale = match &e {
1364 InvocationError::Rpc(r) if r.code == -404 => true,
1366 InvocationError::Io(io) if io.kind() == std::io::ErrorKind::UnexpectedEof
1368 || io.kind() == std::io::ErrorKind::ConnectionReset => true,
1369 _ => false,
1371 };
1372
1373 if key_is_stale {
1374 log::warn!(
1375 "[layer] init_connection failed with definitive bad-key signal ({e}) \
1376 — clearing auth key for fresh DH …"
1377 );
1378 init_fail_count = 0;
1379 let home_dc_id = *self.inner.home_dc_id.lock().await;
1380 let mut opts = self.inner.dc_options.lock().await;
1381 if let Some(entry) = opts.get_mut(&home_dc_id) {
1382 entry.auth_key = None;
1383 }
1384 } else {
1385 init_fail_count += 1;
1386 log::warn!(
1387 "[layer] init_connection failed transiently (attempt {init_fail_count}, {e}) \
1388 — retrying with same key …"
1389 );
1390 }
1391 {
1392 let mut pending = self.inner.pending.lock().await;
1393 let msg = e.to_string();
1394 for (_, tx) in pending.drain() {
1395 let _ = tx.send(Err(InvocationError::Io(
1396 std::io::Error::new(
1397 std::io::ErrorKind::ConnectionReset, msg.clone()))));
1398 }
1399 }
1400 match self.do_reconnect_loop(
1401 0, &mut rh, &mut fk, &mut ak, &mut sid, network_hint_rx,
1402 ).await {
1403 Some(rx) => { init_rx = Some(rx); }
1404 None => return,
1405 }
1406 }
1407
1408 Err(_) => {
1409 log::warn!("[layer] init_connection task dropped unexpectedly, reconnecting …");
1411 match self.do_reconnect_loop(
1412 RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
1413 network_hint_rx,
1414 ).await {
1415 Some(rx) => { init_rx = Some(rx); }
1416 None => return,
1417 }
1418 }
1419 }
1420 }
1421 }
1422 }
1423 }
1424
1425 async fn route_frame(&self, body: Vec<u8>, msg_id: i64) {
1427 if body.len() < 4 { return; }
1428 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
1429
1430 match cid {
1431 ID_RPC_RESULT => {
1432 if body.len() < 12 { return; }
1433 let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1434 let inner = body[12..].to_vec();
1435 self.inner.writer.lock().await.pending_ack.push(msg_id);
1437 let result = unwrap_envelope(inner);
1438 if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
1439 self.inner.writer.lock().await.sent_bodies.remove(&req_msg_id);
1441 let to_send = match result {
1442 Ok(EnvelopeResult::Payload(p)) => Ok(p),
1443 Ok(EnvelopeResult::Updates(us)) => {
1444 for u in us { let _ = self.inner.update_tx.send(u); }
1445 Ok(vec![])
1446 }
1447 Ok(EnvelopeResult::None) => Ok(vec![]),
1448 Err(e) => Err(e),
1449 };
1450 let _ = tx.send(to_send);
1451 }
1452 }
1453 ID_RPC_ERROR => {
1454 log::warn!("[layer] Unexpected top-level rpc_error (no pending target)");
1455 }
1456 ID_MSG_CONTAINER => {
1457 if body.len() < 8 { return; }
1458 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
1459 let mut pos = 8usize;
1460 for _ in 0..count {
1461 if pos + 16 > body.len() { break; }
1462 let inner_msg_id = i64::from_le_bytes(body[pos..pos + 8].try_into().unwrap());
1464 let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
1465 pos += 16;
1466 if pos + inner_len > body.len() { break; }
1467 let inner = body[pos..pos + inner_len].to_vec();
1468 pos += inner_len;
1469 Box::pin(self.route_frame(inner, inner_msg_id)).await;
1470 }
1471 }
1472 ID_GZIP_PACKED => {
1473 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
1474 if let Ok(inflated) = gz_inflate(&bytes) {
1475 Box::pin(self.route_frame(inflated, msg_id)).await;
1477 }
1478 }
1479 ID_BAD_SERVER_SALT => {
1480 if body.len() >= 24 {
1486 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1487 let new_salt = i64::from_le_bytes(body[16..24].try_into().unwrap());
1488
1489 self.inner.writer.lock().await.enc.salt = new_salt;
1491 log::debug!("[layer] bad_server_salt: bad_msg_id={bad_msg_id} salt={new_salt:#x}");
1492
1493 {
1495 let mut w = self.inner.writer.lock().await;
1496 if let Some(orig_body) = w.sent_bodies.remove(&bad_msg_id) {
1497 let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
1498 let fk = w.frame_kind.clone();
1499 w.sent_bodies.insert(new_msg_id, orig_body);
1500 drop(w);
1502 let mut pending = self.inner.pending.lock().await;
1503 if let Some(tx) = pending.remove(&bad_msg_id) {
1504 pending.insert(new_msg_id, tx);
1505 drop(pending);
1506 let mut w = self.inner.writer.lock().await;
1507 if let Err(e) = send_frame_write(&mut w.write_half, &wire, &fk).await {
1508 log::warn!("[layer] bad_server_salt re-send failed: {e}");
1509 } else {
1510 log::debug!("[layer] bad_server_salt re-sent {bad_msg_id}→{new_msg_id}");
1511 }
1512 }
1513 }
1514 }
1515
1516 let inner = Arc::clone(&self.inner);
1518 tokio::spawn(async move {
1519 log::debug!("[layer] G-08 proactive GetFutureSalts …");
1520 let mut req_body = Vec::with_capacity(8);
1522 req_body.extend_from_slice(&0xb921bd04_u32.to_le_bytes());
1523 req_body.extend_from_slice(&64_i32.to_le_bytes());
1524 let (wire, fs_msg_id) = {
1526 let mut w = inner.writer.lock().await;
1527 let (wire, id) = w.enc.pack_body_with_msg_id(&req_body, true);
1528 w.sent_bodies.insert(id, req_body);
1529 (wire, id)
1530 };
1531 let fk = inner.writer.lock().await.frame_kind.clone();
1532 let (tx, rx) = tokio::sync::oneshot::channel();
1533 inner.pending.lock().await.insert(fs_msg_id, tx);
1534 {
1535 let mut w = inner.writer.lock().await;
1536 if send_frame_write(&mut w.write_half, &wire, &fk).await.is_err() {
1537 inner.pending.lock().await.remove(&fs_msg_id);
1538 inner.writer.lock().await.sent_bodies.remove(&fs_msg_id);
1539 return;
1540 }
1541 }
1542 let _ = tokio::time::timeout(
1543 std::time::Duration::from_secs(30), rx
1544 ).await;
1545 });
1546 }
1547 }
1548 ID_PONG => {
1549 if body.len() >= 20 {
1553 let ping_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1554 if let Some(tx) = self.inner.pending.lock().await.remove(&ping_msg_id) {
1555 self.inner.writer.lock().await.sent_bodies.remove(&ping_msg_id);
1556 let _ = tx.send(Ok(body));
1557 }
1558 }
1559 }
1560 ID_FUTURE_SALTS => {
1562 if body.len() >= 12 {
1574 let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1575 if body.len() >= 40 {
1576 let count = u32::from_le_bytes(body[20..24].try_into().unwrap()) as usize;
1577 if count > 0 {
1578 let salt_val = i64::from_le_bytes(body[32..40].try_into().unwrap());
1579 self.inner.writer.lock().await.enc.salt = salt_val;
1580 log::debug!("[layer] G-09 FutureSalts: salt={salt_val:#x} count={count}");
1581 }
1582 }
1583 if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
1584 self.inner.writer.lock().await.sent_bodies.remove(&req_msg_id);
1585 let _ = tx.send(Ok(body));
1586 }
1587 }
1588 }
1589 ID_NEW_SESSION => {
1590 if body.len() >= 28 {
1592 let server_salt = i64::from_le_bytes(body[20..28].try_into().unwrap());
1593 self.inner.writer.lock().await.enc.salt = server_salt;
1594 log::debug!("[layer] new_session_created — salt reset to {server_salt:#x}");
1595 }
1596 }
1597 ID_BAD_MSG_NOTIFY => {
1599 if body.len() < 20 { return; }
1601 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1602 let error_code = u32::from_le_bytes(body[16..20].try_into().unwrap());
1603 log::warn!("[layer] bad_msg_notification: msg_id={bad_msg_id} code={error_code}");
1604
1605 let resend: Option<(Vec<u8>, i64, FrameKind)> = {
1609 let mut w = self.inner.writer.lock().await;
1610 if error_code == 16 || error_code == 17 {
1612 w.enc.correct_time_offset(bad_msg_id);
1613 }
1614 if error_code == 32 || error_code == 33 {
1616 w.enc.correct_seq_no(error_code);
1617 }
1618 if let Some(orig_body) = w.sent_bodies.remove(&bad_msg_id) {
1620 let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
1621 let fk = w.frame_kind.clone();
1622 w.sent_bodies.insert(new_msg_id, orig_body);
1623 Some((wire, new_msg_id, fk))
1624 } else {
1625 None
1626 }
1627 }; match resend {
1630 Some((wire, new_msg_id, fk)) => {
1631 let has_waiter = {
1633 let mut pending = self.inner.pending.lock().await;
1634 if let Some(tx) = pending.remove(&bad_msg_id) {
1635 pending.insert(new_msg_id, tx);
1636 true
1637 } else {
1638 false
1639 }
1640 };
1641 if has_waiter {
1642 let mut w = self.inner.writer.lock().await;
1644 if let Err(e) = send_frame_write(&mut w.write_half, &wire, &fk).await {
1645 log::warn!("[layer] G-02 re-send failed: {e}");
1646 w.sent_bodies.remove(&new_msg_id);
1647 } else {
1648 log::debug!("[layer] G-02 re-sent {bad_msg_id}→{new_msg_id}");
1649 }
1650 } else {
1651 self.inner.writer.lock().await.sent_bodies.remove(&new_msg_id);
1652 }
1653 }
1654 None => {
1655 if let Some(tx) = self.inner.pending.lock().await.remove(&bad_msg_id) {
1657 let _ = tx.send(Err(InvocationError::Deserialize(
1658 format!("bad_msg_notification code={error_code}")
1659 )));
1660 }
1661 }
1662 }
1663 }
1664 ID_MSG_DETAILED_INFO => {
1666 if body.len() >= 20 {
1670 let answer_msg_id = i64::from_le_bytes(body[12..20].try_into().unwrap());
1671 self.inner.writer.lock().await.pending_ack.push(answer_msg_id);
1672 log::trace!("[layer] G-11 MsgDetailedInfo: queued ack for answer_msg_id={answer_msg_id}");
1673 }
1674 }
1675 ID_MSG_NEW_DETAIL_INFO => {
1676 if body.len() >= 12 {
1679 let answer_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1680 self.inner.writer.lock().await.pending_ack.push(answer_msg_id);
1681 log::trace!("[layer] G-11 MsgNewDetailedInfo: queued ack for {answer_msg_id}");
1682 }
1683 }
1684 ID_MSG_RESEND_REQ => {
1686 if body.len() >= 12 {
1691 let count = u32::from_le_bytes(body[8..12].try_into().unwrap()) as usize;
1692 let mut w = self.inner.writer.lock().await;
1693 let fk = w.frame_kind.clone();
1694 for i in 0..count {
1695 let off = 12 + i * 8;
1696 if off + 8 > body.len() { break; }
1697 let resend_id = i64::from_le_bytes(body[off..off + 8].try_into().unwrap());
1698 if let Some(orig_body) = w.sent_bodies.remove(&resend_id) {
1699 let (wire, new_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
1700 let mut pending = self.inner.pending.lock().await;
1702 if let Some(tx) = pending.remove(&resend_id) {
1703 pending.insert(new_id, tx);
1704 }
1705 drop(pending);
1706 w.sent_bodies.insert(new_id, orig_body);
1707 send_frame_write(&mut w.write_half, &wire, &fk).await.ok();
1708 log::debug!("[layer] G-14 MsgResendReq: resent {resend_id} → {new_id}");
1709 }
1710 }
1711 }
1712 }
1713 0xe22045fc => {
1715 log::warn!("[layer] destroy_session_ok received — session terminated by server");
1716 }
1717 0x62d350c9 => {
1718 log::warn!("[layer] destroy_session_none received — session was already gone");
1719 }
1720 ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
1721 | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
1722 | ID_UPDATES_TOO_LONG => {
1723 self.inner.writer.lock().await.pending_ack.push(msg_id);
1725 self.dispatch_updates(&body).await;
1727 }
1728 _ => {}
1729 }
1730 }
1731
1732
1733 async fn dispatch_updates(&self, body: &[u8]) {
1738 if body.len() < 4 { return; }
1739 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
1740
1741 if cid == 0xe317af7e_u32 {
1743 log::warn!("[layer] updatesTooLong — getDifference");
1744 match self.get_difference().await {
1745 Ok(updates) => { for u in updates { let _ = self.inner.update_tx.send(u); } }
1746 Err(e) => log::warn!("[layer] getDifference after updatesTooLong: {e}"),
1747 }
1748 return;
1749 }
1750
1751 if cid == 0x313bc7f8 || cid == 0x4d6deea5 {
1754 for u in update::parse_updates(body) {
1758 let _ = self.inner.update_tx.send(u);
1759 }
1760 return;
1761 }
1762
1763 {
1765 use layer_tl_types::{Cursor, Deserializable};
1766 let mut cur = Cursor::from_slice(body);
1767 let seq_info: Option<(i32, i32)> = match cid {
1768 0x74ae4240 => {
1769 match tl::enums::Updates::deserialize(&mut cur) {
1771 Ok(tl::enums::Updates::Updates(u)) => Some((u.seq, u.seq)),
1772 _ => None,
1773 }
1774 }
1775 0x725b04c3 => {
1776 match tl::enums::Updates::deserialize(&mut cur) {
1778 Ok(tl::enums::Updates::Combined(u)) => Some((u.seq, u.seq_start)),
1779 _ => None,
1780 }
1781 }
1782 _ => None,
1783 };
1784 if let Some((seq, seq_start)) = seq_info
1785 && seq != 0 {
1786 match self.check_and_fill_seq_gap(seq, seq_start).await {
1787 Ok(extra) => { for u in extra { let _ = self.inner.update_tx.send(u); } }
1788 Err(e) => log::warn!("[layer] seq gap fill: {e}"),
1789 }
1790 }
1791 }
1792
1793 use layer_tl_types::{Cursor, Deserializable};
1795 let mut cur = Cursor::from_slice(body);
1796 let raw: Vec<tl::enums::Update> = match cid {
1797 0x78d4dec1 => { match tl::types::UpdateShort::deserialize(&mut cur) {
1799 Ok(u) => vec![u.update],
1800 Err(_) => vec![],
1801 }
1802 }
1803 0x74ae4240 => { match tl::enums::Updates::deserialize(&mut cur) {
1805 Ok(tl::enums::Updates::Updates(u)) => u.updates,
1806 _ => vec![],
1807 }
1808 }
1809 0x725b04c3 => { match tl::enums::Updates::deserialize(&mut cur) {
1811 Ok(tl::enums::Updates::Combined(u)) => u.updates,
1812 _ => vec![],
1813 }
1814 }
1815 _ => vec![],
1816 };
1817
1818 for upd in raw {
1819 self.dispatch_single_update(upd).await;
1820 }
1821 }
1822
1823 async fn dispatch_single_update(&self, upd: tl::enums::Update) {
1826 enum Kind {
1829 GlobalPts { pts: i32, pts_count: i32, carry: bool },
1830 ChannelPts { channel_id: i64, pts: i32, pts_count: i32, carry: bool },
1831 Qts { qts: i32 },
1832 Passthrough,
1833 }
1834
1835 fn ch_from_msg(msg: &tl::enums::Message) -> i64 {
1836 if let tl::enums::Message::Message(m) = msg
1837 && let tl::enums::Peer::Channel(c) = &m.peer_id { return c.channel_id; }
1838 0
1839 }
1840
1841 let kind = {
1842 use tl::enums::Update::*;
1843 match &upd {
1844 NewMessage(u) => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: true },
1845 EditMessage(u) => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: true },
1846 DeleteMessages(u) => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: true },
1847 ReadHistoryInbox(u) => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: false },
1848 ReadHistoryOutbox(u) => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: false },
1849 NewChannelMessage(u) => Kind::ChannelPts { channel_id: ch_from_msg(&u.message), pts: u.pts, pts_count: u.pts_count, carry: true },
1850 EditChannelMessage(u) => Kind::ChannelPts { channel_id: ch_from_msg(&u.message), pts: u.pts, pts_count: u.pts_count, carry: true },
1851 DeleteChannelMessages(u) => Kind::ChannelPts { channel_id: u.channel_id, pts: u.pts, pts_count: u.pts_count, carry: true },
1852 NewEncryptedMessage(u) => Kind::Qts { qts: u.qts },
1853 _ => Kind::Passthrough,
1854 }
1855 };
1856
1857 let high = update::from_single_update_pub(upd);
1858
1859 let to_send: Vec<update::Update> = match kind {
1860 Kind::GlobalPts { pts, pts_count, carry } => {
1861 let first = if carry { high.into_iter().next() } else { None };
1862 match self.check_and_fill_gap(pts, pts_count, first).await {
1863 Ok(v) => v,
1864 Err(e) => { log::warn!("[layer] pts gap: {e}"); vec![] }
1865 }
1866 }
1867 Kind::ChannelPts { channel_id, pts, pts_count, carry } => {
1868 let first = if carry { high.into_iter().next() } else { None };
1869 if channel_id != 0 {
1870 match self.check_and_fill_channel_gap(channel_id, pts, pts_count, first).await {
1871 Ok(v) => v,
1872 Err(e) => { log::warn!("[layer] ch pts gap: {e}"); vec![] }
1873 }
1874 } else {
1875 first.into_iter().collect()
1876 }
1877 }
1878 Kind::Qts { qts } => {
1879 match self.check_and_fill_qts_gap(qts, 1).await {
1880 Ok(_) => vec![],
1881 Err(e) => { log::warn!("[layer] qts gap: {e}"); vec![] }
1882 }
1883 }
1884 Kind::Passthrough => high,
1885 };
1886
1887 for u in to_send {
1888 let _ = self.inner.update_tx.send(u);
1889 }
1890 }
1891
1892 async fn do_reconnect_loop(
1902 &self,
1903 initial_delay_ms: u64,
1904 rh: &mut OwnedReadHalf,
1905 fk: &mut FrameKind,
1906 ak: &mut [u8; 256],
1907 sid: &mut i64,
1908 network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
1909 ) -> Option<oneshot::Receiver<Result<(), InvocationError>>> {
1910 let mut delay_ms = if initial_delay_ms == 0 {
1911 0
1914 } else {
1915 initial_delay_ms.max(RECONNECT_BASE_MS)
1916 };
1917 loop {
1918 log::info!("[layer] Reconnecting in {delay_ms} ms …");
1919 tokio::select! {
1920 _ = sleep(Duration::from_millis(delay_ms)) => {}
1921 hint = network_hint_rx.recv() => {
1922 hint?; log::info!("[layer] Network hint → skipping backoff, reconnecting now");
1924 }
1925 }
1926
1927 match self.do_reconnect(ak, fk).await {
1928 Ok((new_rh, new_fk, new_ak, new_sid)) => {
1929 *rh = new_rh; *fk = new_fk; *ak = new_ak; *sid = new_sid;
1930 log::info!("[layer] TCP reconnected ✓ — initialising session …");
1931
1932 let (init_tx, init_rx) = oneshot::channel();
1936 let c = self.clone();
1937 let utx = self.inner.update_tx.clone();
1938 tokio::spawn(async move {
1939 let result = loop {
1944 match c.init_connection().await {
1945 Ok(()) => break Ok(()),
1946 Err(InvocationError::Rpc(ref r))
1947 if r.flood_wait_seconds().is_some() =>
1948 {
1949 let secs = r.flood_wait_seconds().unwrap();
1950 log::warn!(
1951 "[layer] init_connection FLOOD_WAIT_{secs} — waiting before retry"
1952 );
1953 sleep(Duration::from_secs(secs + 1)).await;
1954 }
1956 Err(e) => break Err(e),
1957 }
1958 };
1959 if result.is_ok() {
1960 if let Ok(missed) = c.get_difference().await {
1962 for u in missed { let _ = utx.send(u); }
1963 }
1964 }
1965 let _ = init_tx.send(result);
1966 });
1967 return Some(init_rx);
1968 }
1969 Err(e) => {
1970 log::warn!("[layer] Reconnect attempt failed: {e}");
1971 let next = delay_ms.saturating_mul(2).clamp(RECONNECT_BASE_MS, RECONNECT_MAX_SECS * 1_000);
1975 delay_ms = jitter_delay(next).as_millis() as u64;
1976 }
1977 }
1978 }
1979 }
1980
1981 async fn do_reconnect(
1983 &self,
1984 _old_auth_key: &[u8; 256],
1985 _old_frame_kind: &FrameKind,
1986 ) -> Result<(OwnedReadHalf, FrameKind, [u8; 256], i64), InvocationError> {
1987 let home_dc_id = *self.inner.home_dc_id.lock().await;
1988 let (addr, saved_key, first_salt, time_offset) = {
1989 let opts = self.inner.dc_options.lock().await;
1990 match opts.get(&home_dc_id) {
1991 Some(e) => (e.addr.clone(), e.auth_key, e.first_salt, e.time_offset),
1992 None => ("149.154.167.51:443".to_string(), None, 0, 0),
1993 }
1994 };
1995 let socks5 = self.inner.socks5.clone();
1996 let transport = self.inner.transport.clone();
1997
1998 let new_conn = if let Some(key) = saved_key {
1999 log::info!("[layer] Reconnecting to DC{home_dc_id} with saved key …");
2000 match Connection::connect_with_key(
2001 &addr, key, first_salt, time_offset, socks5.as_ref(), &transport,
2002 ).await {
2003 Ok(c) => c,
2004 Err(e2) => {
2005 log::warn!("[layer] connect_with_key failed ({e2}), fresh DH …");
2006 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
2007 }
2008 }
2009 } else {
2010 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
2011 };
2012
2013 let (new_writer, new_read, new_fk) = new_conn.into_writer();
2014 let new_ak = new_writer.enc.auth_key_bytes();
2015 let new_sid = new_writer.enc.session_id();
2016 *self.inner.writer.lock().await = new_writer;
2017
2018 Ok((new_read, new_fk, new_ak, new_sid))
2030 }
2031
2032 pub async fn send_message(&self, peer: &str, text: &str) -> Result<(), InvocationError> {
2036 let p = self.resolve_peer(peer).await?;
2037 self.send_message_to_peer(p, text).await
2038 }
2039
2040 pub async fn send_message_to_peer(
2042 &self,
2043 peer: tl::enums::Peer,
2044 text: &str,
2045 ) -> Result<(), InvocationError> {
2046 self.send_message_to_peer_ex(peer, &InputMessage::text(text)).await
2047 }
2048
2049 pub async fn send_message_to_peer_ex(
2051 &self,
2052 peer: tl::enums::Peer,
2053 msg: &InputMessage,
2054 ) -> Result<(), InvocationError> {
2055 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2056 let schedule = if msg.schedule_once_online {
2057 Some(0x7FFF_FFFEi32)
2058 } else {
2059 msg.schedule_date
2060 };
2061
2062 if let Some(media) = &msg.media {
2064 let req = tl::functions::messages::SendMedia {
2065 silent: msg.silent,
2066 background: msg.background,
2067 clear_draft: msg.clear_draft,
2068 noforwards: false,
2069 update_stickersets_order: false,
2070 invert_media: msg.invert_media,
2071 allow_paid_floodskip: false,
2072 peer: input_peer,
2073 reply_to: msg.reply_header(),
2074 media: media.clone(),
2075 message: msg.text.clone(),
2076 random_id: random_i64(),
2077 reply_markup: msg.reply_markup.clone(),
2078 entities: msg.entities.clone(),
2079 schedule_date: schedule,
2080 schedule_repeat_period: None,
2081 send_as: None,
2082 quick_reply_shortcut: None,
2083 effect: None,
2084 allow_paid_stars: None,
2085 suggested_post: None,
2086 };
2087 return self.rpc_call_raw_pub(&req).await.map(|_| ());
2088 }
2089
2090 let req = tl::functions::messages::SendMessage {
2091 no_webpage: msg.no_webpage,
2092 silent: msg.silent,
2093 background: msg.background,
2094 clear_draft: msg.clear_draft,
2095 noforwards: false,
2096 update_stickersets_order: false,
2097 invert_media: msg.invert_media,
2098 allow_paid_floodskip: false,
2099 peer: input_peer,
2100 reply_to: msg.reply_header(),
2101 message: msg.text.clone(),
2102 random_id: random_i64(),
2103 reply_markup: msg.reply_markup.clone(),
2104 entities: msg.entities.clone(),
2105 schedule_date: schedule,
2106 schedule_repeat_period: None,
2107 send_as: None,
2108 quick_reply_shortcut: None,
2109 effect: None,
2110 allow_paid_stars: None,
2111 suggested_post: None,
2112 };
2113 self.rpc_write(&req).await
2114 }
2115
2116 pub async fn send_to_self(&self, text: &str) -> Result<(), InvocationError> {
2118 let req = tl::functions::messages::SendMessage {
2119 no_webpage: false,
2120 silent: false,
2121 background: false,
2122 clear_draft: false,
2123 noforwards: false,
2124 update_stickersets_order: false,
2125 invert_media: false,
2126 allow_paid_floodskip: false,
2127 peer: tl::enums::InputPeer::PeerSelf,
2128 reply_to: None,
2129 message: text.to_string(),
2130 random_id: random_i64(),
2131 reply_markup: None,
2132 entities: None,
2133 schedule_date: None,
2134 schedule_repeat_period: None,
2135 send_as: None,
2136 quick_reply_shortcut: None,
2137 effect: None,
2138 allow_paid_stars: None,
2139 suggested_post: None,
2140 };
2141 self.rpc_write(&req).await
2142 }
2143
2144 pub async fn edit_message(
2146 &self,
2147 peer: tl::enums::Peer,
2148 message_id: i32,
2149 new_text: &str,
2150 ) -> Result<(), InvocationError> {
2151 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2152 let req = tl::functions::messages::EditMessage {
2153 no_webpage: false,
2154 invert_media: false,
2155 peer: input_peer,
2156 id: message_id,
2157 message: Some(new_text.to_string()),
2158 media: None,
2159 reply_markup: None,
2160 entities: None,
2161 schedule_date: None,
2162 schedule_repeat_period: None,
2163 quick_reply_shortcut_id: None,
2164 };
2165 self.rpc_write(&req).await
2166 }
2167
2168 pub async fn forward_messages(
2170 &self,
2171 destination: tl::enums::Peer,
2172 message_ids: &[i32],
2173 source: tl::enums::Peer,
2174 ) -> Result<(), InvocationError> {
2175 let cache = self.inner.peer_cache.lock().await;
2176 let to_peer = cache.peer_to_input(&destination);
2177 let from_peer = cache.peer_to_input(&source);
2178 drop(cache);
2179
2180 let req = tl::functions::messages::ForwardMessages {
2181 silent: false,
2182 background: false,
2183 with_my_score: false,
2184 drop_author: false,
2185 drop_media_captions: false,
2186 noforwards: false,
2187 from_peer,
2188 id: message_ids.to_vec(),
2189 random_id: (0..message_ids.len()).map(|_| random_i64()).collect(),
2190 to_peer,
2191 top_msg_id: None,
2192 reply_to: None,
2193 schedule_date: None,
2194 schedule_repeat_period: None,
2195 send_as: None,
2196 quick_reply_shortcut: None,
2197 effect: None,
2198 video_timestamp: None,
2199 allow_paid_stars: None,
2200 allow_paid_floodskip: false,
2201 suggested_post: None,
2202 };
2203 self.rpc_write(&req).await
2204 }
2205
2206 pub async fn delete_messages(&self, message_ids: Vec<i32>, revoke: bool) -> Result<(), InvocationError> {
2208 let req = tl::functions::messages::DeleteMessages { revoke, id: message_ids };
2209 self.rpc_write(&req).await
2210 }
2211
2212 pub async fn get_messages_by_id(
2214 &self,
2215 peer: tl::enums::Peer,
2216 ids: &[i32],
2217 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2218 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2219 let id_list: Vec<tl::enums::InputMessage> = ids.iter()
2220 .map(|&id| tl::enums::InputMessage::Id(tl::types::InputMessageId { id }))
2221 .collect();
2222 let req = tl::functions::channels::GetMessages {
2223 channel: match &input_peer {
2224 tl::enums::InputPeer::Channel(c) =>
2225 tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2226 channel_id: c.channel_id, access_hash: c.access_hash
2227 }),
2228 _ => return self.get_messages_user(input_peer, id_list).await,
2229 },
2230 id: id_list,
2231 };
2232 let body = self.rpc_call_raw(&req).await?;
2233 let mut cur = Cursor::from_slice(&body);
2234 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2235 tl::enums::messages::Messages::Messages(m) => m.messages,
2236 tl::enums::messages::Messages::Slice(m) => m.messages,
2237 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2238 tl::enums::messages::Messages::NotModified(_) => vec![],
2239 };
2240 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2241 }
2242
2243 async fn get_messages_user(
2244 &self,
2245 _peer: tl::enums::InputPeer,
2246 ids: Vec<tl::enums::InputMessage>,
2247 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2248 let req = tl::functions::messages::GetMessages { id: ids };
2249 let body = self.rpc_call_raw(&req).await?;
2250 let mut cur = Cursor::from_slice(&body);
2251 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2252 tl::enums::messages::Messages::Messages(m) => m.messages,
2253 tl::enums::messages::Messages::Slice(m) => m.messages,
2254 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2255 tl::enums::messages::Messages::NotModified(_) => vec![],
2256 };
2257 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2258 }
2259
2260 pub async fn get_pinned_message(
2262 &self,
2263 peer: tl::enums::Peer,
2264 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
2265 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2266 let req = tl::functions::messages::Search {
2267 peer: input_peer,
2268 q: String::new(),
2269 from_id: None,
2270 saved_peer_id: None,
2271 saved_reaction: None,
2272 top_msg_id: None,
2273 filter: tl::enums::MessagesFilter::InputMessagesFilterPinned,
2274 min_date: 0,
2275 max_date: 0,
2276 offset_id: 0,
2277 add_offset: 0,
2278 limit: 1,
2279 max_id: 0,
2280 min_id: 0,
2281 hash: 0,
2282 };
2283 let body = self.rpc_call_raw(&req).await?;
2284 let mut cur = Cursor::from_slice(&body);
2285 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2286 tl::enums::messages::Messages::Messages(m) => m.messages,
2287 tl::enums::messages::Messages::Slice(m) => m.messages,
2288 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2289 tl::enums::messages::Messages::NotModified(_) => vec![],
2290 };
2291 Ok(msgs.into_iter().next().map(update::IncomingMessage::from_raw))
2292 }
2293
2294 pub async fn pin_message(
2296 &self,
2297 peer: tl::enums::Peer,
2298 message_id: i32,
2299 silent: bool,
2300 unpin: bool,
2301 pm_oneside: bool,
2302 ) -> Result<(), InvocationError> {
2303 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2304 let req = tl::functions::messages::UpdatePinnedMessage {
2305 silent,
2306 unpin,
2307 pm_oneside,
2308 peer: input_peer,
2309 id: message_id,
2310 };
2311 self.rpc_write(&req).await
2312 }
2313
2314 pub async fn unpin_message(
2316 &self,
2317 peer: tl::enums::Peer,
2318 message_id: i32,
2319 ) -> Result<(), InvocationError> {
2320 self.pin_message(peer, message_id, true, true, false).await
2321 }
2322
2323 pub async fn unpin_all_messages(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2325 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2326 let req = tl::functions::messages::UnpinAllMessages {
2327 peer: input_peer,
2328 top_msg_id: None,
2329 saved_peer_id: None,
2330 };
2331 self.rpc_write(&req).await
2332 }
2333
2334 pub async fn search_messages(
2339 &self,
2340 peer: tl::enums::Peer,
2341 query: &str,
2342 limit: i32,
2343 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2344 self.search(peer, query).limit(limit).fetch(self).await
2345 }
2346
2347 pub fn search(&self, peer: tl::enums::Peer, query: &str) -> SearchBuilder {
2349 SearchBuilder::new(peer, query.to_string())
2350 }
2351
2352 pub async fn search_global(
2354 &self,
2355 query: &str,
2356 limit: i32,
2357 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2358 self.search_global_builder(query).limit(limit).fetch(self).await
2359 }
2360
2361 pub fn search_global_builder(&self, query: &str) -> GlobalSearchBuilder {
2363 GlobalSearchBuilder::new(query.to_string())
2364 }
2365
2366 pub async fn get_scheduled_messages(
2383 &self,
2384 peer: tl::enums::Peer,
2385 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2386 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2387 let req = tl::functions::messages::GetScheduledHistory {
2388 peer: input_peer,
2389 hash: 0,
2390 };
2391 let body = self.rpc_call_raw(&req).await?;
2392 let mut cur = Cursor::from_slice(&body);
2393 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2394 tl::enums::messages::Messages::Messages(m) => m.messages,
2395 tl::enums::messages::Messages::Slice(m) => m.messages,
2396 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2397 tl::enums::messages::Messages::NotModified(_) => vec![],
2398 };
2399 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2400 }
2401
2402 pub async fn delete_scheduled_messages(
2404 &self,
2405 peer: tl::enums::Peer,
2406 ids: Vec<i32>,
2407 ) -> Result<(), InvocationError> {
2408 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2409 let req = tl::functions::messages::DeleteScheduledMessages {
2410 peer: input_peer,
2411 id: ids,
2412 };
2413 self.rpc_write(&req).await
2414 }
2415
2416 pub async fn edit_inline_message(
2434 &self,
2435 id: tl::enums::InputBotInlineMessageId,
2436 new_text: &str,
2437 reply_markup: Option<tl::enums::ReplyMarkup>,
2438 ) -> Result<bool, InvocationError> {
2439 let req = tl::functions::messages::EditInlineBotMessage {
2440 no_webpage: false,
2441 invert_media: false,
2442 id,
2443 message: Some(new_text.to_string()),
2444 media: None,
2445 reply_markup,
2446 entities: None,
2447 };
2448 let body = self.rpc_call_raw(&req).await?;
2449 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
2451 }
2452
2453 pub async fn answer_callback_query(
2455 &self,
2456 query_id: i64,
2457 text: Option<&str>,
2458 alert: bool,
2459 ) -> Result<bool, InvocationError> {
2460 let req = tl::functions::messages::SetBotCallbackAnswer {
2461 alert,
2462 query_id,
2463 message: text.map(|s| s.to_string()),
2464 url: None,
2465 cache_time: 0,
2466 };
2467 let body = self.rpc_call_raw(&req).await?;
2468 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
2469 }
2470
2471 pub async fn answer_inline_query(
2472 &self,
2473 query_id: i64,
2474 results: Vec<tl::enums::InputBotInlineResult>,
2475 cache_time: i32,
2476 is_personal: bool,
2477 next_offset: Option<String>,
2478 ) -> Result<bool, InvocationError> {
2479 let req = tl::functions::messages::SetInlineBotResults {
2480 gallery: false,
2481 private: is_personal,
2482 query_id,
2483 results,
2484 cache_time,
2485 next_offset,
2486 switch_pm: None,
2487 switch_webview: None,
2488 };
2489 let body = self.rpc_call_raw(&req).await?;
2490 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
2491 }
2492
2493 pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
2497 let req = tl::functions::messages::GetDialogs {
2498 exclude_pinned: false,
2499 folder_id: None,
2500 offset_date: 0,
2501 offset_id: 0,
2502 offset_peer: tl::enums::InputPeer::Empty,
2503 limit,
2504 hash: 0,
2505 };
2506
2507 let body = self.rpc_call_raw(&req).await?;
2508 let mut cur = Cursor::from_slice(&body);
2509 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2510 tl::enums::messages::Dialogs::Dialogs(d) => d,
2511 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
2512 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2513 },
2514 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
2515 };
2516
2517 let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2519 .map(|m| {
2520 let id = match &m {
2521 tl::enums::Message::Message(x) => x.id,
2522 tl::enums::Message::Service(x) => x.id,
2523 tl::enums::Message::Empty(x) => x.id,
2524 };
2525 (id, m)
2526 })
2527 .collect();
2528
2529 let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2531 .filter_map(|u| {
2532 if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2533 })
2534 .collect();
2535
2536 let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2538 .filter_map(|c| {
2539 let id = match &c {
2540 tl::enums::Chat::Chat(x) => x.id,
2541 tl::enums::Chat::Forbidden(x) => x.id,
2542 tl::enums::Chat::Channel(x) => x.id,
2543 tl::enums::Chat::ChannelForbidden(x) => x.id,
2544 tl::enums::Chat::Empty(x) => x.id,
2545 };
2546 Some((id, c))
2547 })
2548 .collect();
2549
2550 {
2552 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2553 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2554 self.cache_users_slice(&u_list).await;
2555 self.cache_chats_slice(&c_list).await;
2556 }
2557
2558 let result = raw.dialogs.into_iter().map(|d| {
2559 let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2560 let peer = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2561
2562 let message = msg_map.get(&top_id).cloned();
2563 let entity = peer.and_then(|p| match p {
2564 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2565 _ => None,
2566 });
2567 let chat = peer.and_then(|p| match p {
2568 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
2569 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2570 _ => None,
2571 });
2572
2573 Dialog { raw: d, message, entity, chat }
2574 }).collect();
2575
2576 Ok(result)
2577 }
2578
2579 #[allow(dead_code)]
2581 async fn get_dialogs_raw(
2582 &self,
2583 req: tl::functions::messages::GetDialogs,
2584 ) -> Result<Vec<Dialog>, InvocationError> {
2585 let body = self.rpc_call_raw(&req).await?;
2586 let mut cur = Cursor::from_slice(&body);
2587 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2588 tl::enums::messages::Dialogs::Dialogs(d) => d,
2589 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
2590 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2591 },
2592 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
2593 };
2594
2595 let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2596 .map(|m| {
2597 let id = match &m {
2598 tl::enums::Message::Message(x) => x.id,
2599 tl::enums::Message::Service(x) => x.id,
2600 tl::enums::Message::Empty(x) => x.id,
2601 };
2602 (id, m)
2603 })
2604 .collect();
2605
2606 let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2607 .filter_map(|u| {
2608 if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2609 })
2610 .collect();
2611
2612 let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2613 .filter_map(|c| {
2614 let id = match &c {
2615 tl::enums::Chat::Chat(x) => x.id,
2616 tl::enums::Chat::Forbidden(x) => x.id,
2617 tl::enums::Chat::Channel(x) => x.id,
2618 tl::enums::Chat::ChannelForbidden(x) => x.id,
2619 tl::enums::Chat::Empty(x) => x.id,
2620 };
2621 Some((id, c))
2622 })
2623 .collect();
2624
2625 {
2626 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2627 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2628 self.cache_users_slice(&u_list).await;
2629 self.cache_chats_slice(&c_list).await;
2630 }
2631
2632 let result = raw.dialogs.into_iter().map(|d| {
2633 let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2634 let peer = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2635
2636 let message = msg_map.get(&top_id).cloned();
2637 let entity = peer.and_then(|p| match p {
2638 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2639 _ => None,
2640 });
2641 let chat = peer.and_then(|p| match p {
2642 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
2643 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2644 _ => None,
2645 });
2646
2647 Dialog { raw: d, message, entity, chat }
2648 }).collect();
2649
2650 Ok(result)
2651 }
2652
2653 async fn get_dialogs_raw_with_count(
2655 &self,
2656 req: tl::functions::messages::GetDialogs,
2657 ) -> Result<(Vec<Dialog>, Option<i32>), InvocationError> {
2658 let body = self.rpc_call_raw(&req).await?;
2659 let mut cur = Cursor::from_slice(&body);
2660 let (raw, count) = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2661 tl::enums::messages::Dialogs::Dialogs(d) => (d, None),
2662 tl::enums::messages::Dialogs::Slice(d) => {
2663 let cnt = Some(d.count);
2664 (tl::types::messages::Dialogs {
2665 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2666 }, cnt)
2667 }
2668 tl::enums::messages::Dialogs::NotModified(_) => return Ok((vec![], None)),
2669 };
2670
2671 let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2672 .filter_map(|m| {
2673 let id = match &m {
2674 tl::enums::Message::Message(x) => x.id,
2675 tl::enums::Message::Service(x) => x.id,
2676 tl::enums::Message::Empty(x) => x.id,
2677 };
2678 Some((id, m))
2679 }).collect();
2680
2681 let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2682 .filter_map(|u| {
2683 if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2684 }).collect();
2685
2686 let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2687 .filter_map(|c| {
2688 let id = match &c {
2689 tl::enums::Chat::Chat(x) => x.id,
2690 tl::enums::Chat::Forbidden(x) => x.id,
2691 tl::enums::Chat::Channel(x) => x.id,
2692 tl::enums::Chat::ChannelForbidden(x) => x.id,
2693 tl::enums::Chat::Empty(x) => x.id,
2694 };
2695 Some((id, c))
2696 }).collect();
2697
2698 {
2699 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2700 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2701 self.cache_users_slice(&u_list).await;
2702 self.cache_chats_slice(&c_list).await;
2703 }
2704
2705 let result = raw.dialogs.into_iter().map(|d| {
2706 let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2707 let peer = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2708 let message = msg_map.get(&top_id).cloned();
2709 let entity = peer.and_then(|p| match p {
2710 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2711 _ => None,
2712 });
2713 let chat = peer.and_then(|p| match p {
2714 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
2715 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2716 _ => None,
2717 });
2718 Dialog { raw: d, message, entity, chat }
2719 }).collect();
2720
2721 Ok((result, count))
2722 }
2723
2724 async fn get_messages_with_count(
2726 &self,
2727 peer: tl::enums::InputPeer,
2728 limit: i32,
2729 offset_id: i32,
2730 ) -> Result<(Vec<update::IncomingMessage>, Option<i32>), InvocationError> {
2731 let req = tl::functions::messages::GetHistory {
2732 peer, offset_id, offset_date: 0, add_offset: 0,
2733 limit, max_id: 0, min_id: 0, hash: 0,
2734 };
2735 let body = self.rpc_call_raw(&req).await?;
2736 let mut cur = Cursor::from_slice(&body);
2737 let (msgs, count) = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2738 tl::enums::messages::Messages::Messages(m) => (m.messages, None),
2739 tl::enums::messages::Messages::Slice(m) => {
2740 let cnt = Some(m.count);
2741 (m.messages, cnt)
2742 }
2743 tl::enums::messages::Messages::ChannelMessages(m) => (m.messages, Some(m.count)),
2744 tl::enums::messages::Messages::NotModified(_) => (vec![], None),
2745 };
2746 Ok((msgs.into_iter().map(update::IncomingMessage::from_raw).collect(), count))
2747 }
2748
2749 pub async fn download_media_to_file(
2760 &self,
2761 location: tl::enums::InputFileLocation,
2762 path: impl AsRef<std::path::Path>,
2763 ) -> Result<(), InvocationError> {
2764 let bytes = self.download_media(location).await?;
2765 std::fs::write(path, &bytes).map_err(InvocationError::Io)?;
2766 Ok(())
2767 }
2768
2769 pub async fn delete_dialog(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2770 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2771 let req = tl::functions::messages::DeleteHistory {
2772 just_clear: false,
2773 revoke: false,
2774 peer: input_peer,
2775 max_id: 0,
2776 min_date: None,
2777 max_date: None,
2778 };
2779 self.rpc_write(&req).await
2780 }
2781
2782 pub async fn mark_as_read(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2784 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2785 match &input_peer {
2786 tl::enums::InputPeer::Channel(c) => {
2787 let req = tl::functions::channels::ReadHistory {
2788 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2789 channel_id: c.channel_id, access_hash: c.access_hash,
2790 }),
2791 max_id: 0,
2792 };
2793 self.rpc_call_raw(&req).await?;
2794 }
2795 _ => {
2796 let req = tl::functions::messages::ReadHistory { peer: input_peer, max_id: 0 };
2797 self.rpc_call_raw(&req).await?;
2798 }
2799 }
2800 Ok(())
2801 }
2802
2803 pub async fn clear_mentions(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2805 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2806 let req = tl::functions::messages::ReadMentions { peer: input_peer, top_msg_id: None };
2807 self.rpc_write(&req).await
2808 }
2809
2810 pub async fn send_chat_action(
2818 &self,
2819 peer: tl::enums::Peer,
2820 action: tl::enums::SendMessageAction,
2821 ) -> Result<(), InvocationError> {
2822 self.send_chat_action_ex(peer, action, None).await
2823 }
2824
2825 pub async fn join_chat(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2829 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2830 match input_peer {
2831 tl::enums::InputPeer::Channel(c) => {
2832 let req = tl::functions::channels::JoinChannel {
2833 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2834 channel_id: c.channel_id, access_hash: c.access_hash,
2835 }),
2836 };
2837 self.rpc_call_raw(&req).await?;
2838 }
2839 tl::enums::InputPeer::Chat(c) => {
2840 let req = tl::functions::messages::AddChatUser {
2841 chat_id: c.chat_id,
2842 user_id: tl::enums::InputUser::UserSelf,
2843 fwd_limit: 0,
2844 };
2845 self.rpc_call_raw(&req).await?;
2846 }
2847 _ => return Err(InvocationError::Deserialize("cannot join this peer type".into())),
2848 }
2849 Ok(())
2850 }
2851
2852 pub async fn accept_invite_link(&self, link: &str) -> Result<(), InvocationError> {
2854 let hash = Self::parse_invite_hash(link)
2855 .ok_or_else(|| InvocationError::Deserialize(format!("invalid invite link: {link}")))?;
2856 let req = tl::functions::messages::ImportChatInvite { hash: hash.to_string() };
2857 self.rpc_write(&req).await
2858 }
2859
2860 pub fn parse_invite_hash(link: &str) -> Option<&str> {
2862 if let Some(pos) = link.find("/+") {
2863 return Some(&link[pos + 2..]);
2864 }
2865 if let Some(pos) = link.find("/joinchat/") {
2866 return Some(&link[pos + 10..]);
2867 }
2868 None
2869 }
2870
2871 pub async fn get_messages(
2875 &self,
2876 peer: tl::enums::InputPeer,
2877 limit: i32,
2878 offset_id: i32,
2879 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2880 let req = tl::functions::messages::GetHistory {
2881 peer, offset_id, offset_date: 0, add_offset: 0,
2882 limit, max_id: 0, min_id: 0, hash: 0,
2883 };
2884 let body = self.rpc_call_raw(&req).await?;
2885 let mut cur = Cursor::from_slice(&body);
2886 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2887 tl::enums::messages::Messages::Messages(m) => m.messages,
2888 tl::enums::messages::Messages::Slice(m) => m.messages,
2889 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2890 tl::enums::messages::Messages::NotModified(_) => vec![],
2891 };
2892 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2893 }
2894
2895 pub async fn resolve_peer(
2899 &self,
2900 peer: &str,
2901 ) -> Result<tl::enums::Peer, InvocationError> {
2902 match peer.trim() {
2903 "me" | "self" => Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 })),
2904 username if username.starts_with('@') => {
2905 self.resolve_username(&username[1..]).await
2906 }
2907 id_str => {
2908 if let Ok(id) = id_str.parse::<i64>() {
2909 Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: id }))
2910 } else {
2911 Err(InvocationError::Deserialize(format!("cannot resolve peer: {peer}")))
2912 }
2913 }
2914 }
2915 }
2916
2917 pub async fn resolve_username(&self, username: &str) -> Result<tl::enums::Peer, InvocationError> {
2921 let req = tl::functions::contacts::ResolveUsername {
2922 username: username.to_string(), referer: None,
2923 };
2924 let body = self.rpc_call_raw(&req).await?;
2925 let mut cur = Cursor::from_slice(&body);
2926 let resolved = match tl::enums::contacts::ResolvedPeer::deserialize(&mut cur)? {
2927 tl::enums::contacts::ResolvedPeer::ResolvedPeer(r) => r,
2928 };
2929 self.cache_users_slice(&resolved.users).await;
2931 self.cache_chats_slice(&resolved.chats).await;
2932 Ok(resolved.peer)
2933 }
2934
2935 pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
2939 let body = self.rpc_call_raw(req).await?;
2940 let mut cur = Cursor::from_slice(&body);
2941 R::Return::deserialize(&mut cur).map_err(Into::into)
2942 }
2943
2944 async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
2945 let mut fail_count = NonZeroU32::new(1).unwrap();
2946 let mut slept_so_far = Duration::default();
2947 loop {
2948 match self.do_rpc_call(req).await {
2949 Ok(body) => return Ok(body),
2950 Err(e) => {
2951 let ctx = RetryContext { fail_count, slept_so_far, error: e };
2952 match self.inner.retry_policy.should_retry(&ctx) {
2953 ControlFlow::Continue(delay) => {
2954 sleep(delay).await;
2955 slept_so_far += delay;
2956 fail_count = fail_count.saturating_add(1);
2957 }
2958 ControlFlow::Break(()) => return Err(ctx.error),
2959 }
2960 }
2961 }
2962 }
2963 }
2964
2965 async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
2975 let (tx, rx) = oneshot::channel();
2976 {
2977 let raw_body = req.to_bytes();
2978 let body = maybe_gz_pack(&raw_body);
2980
2981 let mut w = self.inner.writer.lock().await;
2982 let fk = w.frame_kind.clone();
2983
2984 let acks: Vec<i64> = w.pending_ack.drain(..).collect();
2987
2988 if acks.is_empty() {
2989 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
2991 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
2993 send_frame_write(&mut w.write_half, &wire, &fk).await?;
2994 } else {
2995 let ack_body = build_msgs_ack_body(&acks);
2998 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false); let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true); let container_payload = build_container_body(&[
3004 (ack_msg_id, ack_seqno, ack_body.as_slice()),
3005 (req_msg_id, req_seqno, body.as_slice()),
3006 ]);
3007
3008 let wire = w.enc.pack_container(&container_payload);
3010
3011 w.sent_bodies.insert(req_msg_id, body); self.inner.pending.lock().await.insert(req_msg_id, tx);
3013 send_frame_write(&mut w.write_half, &wire, &fk).await?;
3014 log::trace!("[layer] G-07 container: bundled {} acks + request", acks.len());
3015 }
3016 }
3017 match tokio::time::timeout(Duration::from_secs(30), rx).await {
3018 Ok(Ok(result)) => result,
3019 Ok(Err(_)) => Err(InvocationError::Deserialize("RPC channel closed (reader died?)".into())),
3020 Err(_) => Err(InvocationError::Deserialize("RPC timed out after 30 s".into())),
3021 }
3022 }
3023
3024 async fn rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
3027 let mut fail_count = NonZeroU32::new(1).unwrap();
3028 let mut slept_so_far = Duration::default();
3029 loop {
3030 let result = self.do_rpc_write(req).await;
3031 match result {
3032 Ok(()) => return Ok(()),
3033 Err(e) => {
3034 let ctx = RetryContext { fail_count, slept_so_far, error: e };
3035 match self.inner.retry_policy.should_retry(&ctx) {
3036 ControlFlow::Continue(delay) => {
3037 sleep(delay).await;
3038 slept_so_far += delay;
3039 fail_count = fail_count.saturating_add(1);
3040 }
3041 ControlFlow::Break(()) => return Err(ctx.error),
3042 }
3043 }
3044 }
3045 }
3046 }
3047
3048 async fn do_rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
3049 let (tx, rx) = oneshot::channel();
3050 {
3051 let raw_body = req.to_bytes();
3052 let body = maybe_gz_pack(&raw_body);
3054
3055 let mut w = self.inner.writer.lock().await;
3056 let fk = w.frame_kind.clone();
3057
3058 let acks: Vec<i64> = w.pending_ack.drain(..).collect();
3060
3061 if acks.is_empty() {
3062 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
3063 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
3065 send_frame_write(&mut w.write_half, &wire, &fk).await?;
3066 } else {
3067 let ack_body = build_msgs_ack_body(&acks);
3068 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
3069 let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);
3070 let container_payload = build_container_body(&[
3071 (ack_msg_id, ack_seqno, ack_body.as_slice()),
3072 (req_msg_id, req_seqno, body.as_slice()),
3073 ]);
3074 let wire = w.enc.pack_container(&container_payload);
3075 w.sent_bodies.insert(req_msg_id, body); self.inner.pending.lock().await.insert(req_msg_id, tx);
3077 send_frame_write(&mut w.write_half, &wire, &fk).await?;
3078 log::trace!("[layer] G-07 write container: bundled {} acks + write", acks.len());
3079 }
3080 }
3081 match tokio::time::timeout(Duration::from_secs(30), rx).await {
3082 Ok(Ok(result)) => result.map(|_| ()),
3083 Ok(Err(_)) => Err(InvocationError::Deserialize("rpc_write channel closed".into())),
3084 Err(_) => Err(InvocationError::Deserialize("rpc_write timed out after 30 s".into())),
3085 }
3086 }
3087
3088 async fn init_connection(&self) -> Result<(), InvocationError> {
3091 use tl::functions::{InvokeWithLayer, InitConnection, help::GetConfig};
3092 let req = InvokeWithLayer {
3093 layer: tl::LAYER,
3094 query: InitConnection {
3095 api_id: self.inner.api_id,
3096 device_model: "Linux".to_string(),
3097 system_version: "1.0".to_string(),
3098 app_version: env!("CARGO_PKG_VERSION").to_string(),
3099 system_lang_code: "en".to_string(),
3100 lang_pack: "".to_string(),
3101 lang_code: "en".to_string(),
3102 proxy: None,
3103 params: None,
3104 query: GetConfig {},
3105 },
3106 };
3107
3108 let body = self.rpc_call_raw_serializable(&req).await?;
3110
3111 let mut cur = Cursor::from_slice(&body);
3112 if let Ok(tl::enums::Config::Config(cfg)) = tl::enums::Config::deserialize(&mut cur) {
3113 let allow_ipv6 = self.inner.allow_ipv6;
3114 let mut opts = self.inner.dc_options.lock().await;
3115 for opt in &cfg.dc_options {
3116 let tl::enums::DcOption::DcOption(o) = opt;
3117 if o.media_only || o.cdn || o.tcpo_only { continue; }
3118 if o.ipv6 && !allow_ipv6 { continue; }
3119 let addr = format!("{}:{}", o.ip_address, o.port);
3120 let entry = opts.entry(o.id).or_insert_with(|| DcEntry {
3121 dc_id: o.id, addr: addr.clone(),
3122 auth_key: None, first_salt: 0, time_offset: 0,
3123 });
3124 entry.addr = addr;
3125 }
3126 log::info!("[layer] initConnection ✓ ({} DCs, ipv6={})", cfg.dc_options.len(), allow_ipv6);
3127 }
3128 Ok(())
3129 }
3130
3131 async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
3134 let addr = {
3135 let opts = self.inner.dc_options.lock().await;
3136 opts.get(&new_dc_id).map(|e| e.addr.clone())
3137 .unwrap_or_else(|| "149.154.167.51:443".to_string())
3138 };
3139 log::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
3140
3141 let saved_key = {
3142 let opts = self.inner.dc_options.lock().await;
3143 opts.get(&new_dc_id).and_then(|e| e.auth_key)
3144 };
3145
3146 let socks5 = self.inner.socks5.clone();
3147 let transport = self.inner.transport.clone();
3148 let conn = if let Some(key) = saved_key {
3149 Connection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
3150 } else {
3151 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
3152 };
3153
3154 let new_key = conn.auth_key_bytes();
3155 {
3156 let mut opts = self.inner.dc_options.lock().await;
3157 let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
3158 dc_id: new_dc_id, addr: addr.clone(),
3159 auth_key: None, first_salt: 0, time_offset: 0,
3160 });
3161 entry.auth_key = Some(new_key);
3162 }
3163
3164 let (new_writer, new_read, new_fk) = conn.into_writer();
3166 let new_ak = new_writer.enc.auth_key_bytes();
3167 let new_sid = new_writer.enc.session_id();
3168 *self.inner.writer.lock().await = new_writer;
3169 *self.inner.home_dc_id.lock().await = new_dc_id;
3170
3171 let _ = self.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
3174
3175 loop {
3185 match self.init_connection().await {
3186 Ok(()) => break,
3187 Err(InvocationError::Rpc(ref r))
3188 if r.flood_wait_seconds().is_some() =>
3189 {
3190 let secs = r.flood_wait_seconds().unwrap();
3191 log::warn!(
3192 "[layer] migrate_to DC{new_dc_id}: init FLOOD_WAIT_{secs} — waiting"
3193 );
3194 sleep(Duration::from_secs(secs + 1)).await;
3195 }
3196 Err(e) => return Err(e),
3197 }
3198 }
3199
3200 self.save_session().await.ok();
3201 log::info!("[layer] Now on DC{new_dc_id} ✓");
3202 Ok(())
3203 }
3204
3205 pub fn disconnect(&self) {
3215 self.inner.shutdown_token.cancel();
3216 }
3217
3218 pub async fn sync_update_state(&self) {
3226 let _ = self.sync_pts_state().await;
3227 }
3228
3229 async fn cache_user(&self, user: &tl::enums::User) {
3232 self.inner.peer_cache.lock().await.cache_user(user);
3233 }
3234
3235 async fn cache_users_slice(&self, users: &[tl::enums::User]) {
3236 let mut cache = self.inner.peer_cache.lock().await;
3237 cache.cache_users(users);
3238 }
3239
3240 async fn cache_chats_slice(&self, chats: &[tl::enums::Chat]) {
3241 let mut cache = self.inner.peer_cache.lock().await;
3242 cache.cache_chats(chats);
3243 }
3244
3245 #[doc(hidden)]
3247 pub async fn cache_users_slice_pub(&self, users: &[tl::enums::User]) {
3248 self.cache_users_slice(users).await;
3249 }
3250
3251 #[doc(hidden)]
3252 pub async fn cache_chats_slice_pub(&self, chats: &[tl::enums::Chat]) {
3253 self.cache_chats_slice(chats).await;
3254 }
3255
3256 #[doc(hidden)]
3258 pub async fn rpc_call_raw_pub<R: layer_tl_types::RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
3259 self.rpc_call_raw(req).await
3260 }
3261
3262 async fn rpc_call_raw_serializable<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
3264 let mut fail_count = NonZeroU32::new(1).unwrap();
3265 let mut slept_so_far = Duration::default();
3266 loop {
3267 match self.do_rpc_write_returning_body(req).await {
3268 Ok(body) => return Ok(body),
3269 Err(e) => {
3270 let ctx = RetryContext { fail_count, slept_so_far, error: e };
3271 match self.inner.retry_policy.should_retry(&ctx) {
3272 ControlFlow::Continue(delay) => {
3273 sleep(delay).await;
3274 slept_so_far += delay;
3275 fail_count = fail_count.saturating_add(1);
3276 }
3277 ControlFlow::Break(()) => return Err(ctx.error),
3278 }
3279 }
3280 }
3281 }
3282 }
3283
3284 async fn do_rpc_write_returning_body<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
3285 let (tx, rx) = oneshot::channel();
3286 {
3287 let raw_body = req.to_bytes();
3288 let body = maybe_gz_pack(&raw_body); let mut w = self.inner.writer.lock().await;
3290 let fk = w.frame_kind.clone();
3291 let acks: Vec<i64> = w.pending_ack.drain(..).collect(); if acks.is_empty() {
3293 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
3294 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
3296 send_frame_write(&mut w.write_half, &wire, &fk).await?;
3297 } else {
3298 let ack_body = build_msgs_ack_body(&acks);
3299 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
3300 let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);
3301 let container_payload = build_container_body(&[
3302 (ack_msg_id, ack_seqno, ack_body.as_slice()),
3303 (req_msg_id, req_seqno, body.as_slice()),
3304 ]);
3305 let wire = w.enc.pack_container(&container_payload);
3306 w.sent_bodies.insert(req_msg_id, body); self.inner.pending.lock().await.insert(req_msg_id, tx);
3308 send_frame_write(&mut w.write_half, &wire, &fk).await?;
3309 }
3310 }
3311 match tokio::time::timeout(Duration::from_secs(30), rx).await {
3312 Ok(Ok(result)) => result,
3313 Ok(Err(_)) => Err(InvocationError::Deserialize("rpc channel closed".into())),
3314 Err(_) => Err(InvocationError::Deserialize("rpc timed out after 30 s".into())),
3315 }
3316 }
3317
3318 pub fn iter_dialogs(&self) -> DialogIter {
3335 DialogIter {
3336 offset_date: 0,
3337 offset_id: 0,
3338 offset_peer: tl::enums::InputPeer::Empty,
3339 done: false,
3340 buffer: VecDeque::new(),
3341 total: None,
3342 }
3343 }
3344
3345 pub fn iter_messages(&self, peer: tl::enums::Peer) -> MessageIter {
3359 MessageIter {
3360 peer,
3361 offset_id: 0,
3362 done: false,
3363 buffer: VecDeque::new(),
3364 total: None,
3365 }
3366 }
3367
3368 pub async fn resolve_to_input_peer(
3373 &self,
3374 peer: &tl::enums::Peer,
3375 ) -> Result<tl::enums::InputPeer, InvocationError> {
3376 let cache = self.inner.peer_cache.lock().await;
3377 match peer {
3378 tl::enums::Peer::User(u) => {
3379 if u.user_id == 0 {
3380 return Ok(tl::enums::InputPeer::PeerSelf);
3381 }
3382 match cache.users.get(&u.user_id) {
3383 Some(&hash) => Ok(tl::enums::InputPeer::User(tl::types::InputPeerUser {
3384 user_id: u.user_id, access_hash: hash,
3385 })),
3386 None => Err(InvocationError::Deserialize(format!(
3387 "access_hash unknown for user {}; resolve via username first", u.user_id
3388 ))),
3389 }
3390 }
3391 tl::enums::Peer::Chat(c) => {
3392 Ok(tl::enums::InputPeer::Chat(tl::types::InputPeerChat { chat_id: c.chat_id }))
3393 }
3394 tl::enums::Peer::Channel(c) => {
3395 match cache.channels.get(&c.channel_id) {
3396 Some(&hash) => Ok(tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
3397 channel_id: c.channel_id, access_hash: hash,
3398 })),
3399 None => Err(InvocationError::Deserialize(format!(
3400 "access_hash unknown for channel {}; resolve via username first", c.channel_id
3401 ))),
3402 }
3403 }
3404 }
3405 }
3406
3407 pub async fn invoke_on_dc<R: RemoteCall>(
3415 &self,
3416 dc_id: i32,
3417 req: &R,
3418 ) -> Result<R::Return, InvocationError> {
3419 let body = self.rpc_on_dc_raw(dc_id, req).await?;
3420 let mut cur = Cursor::from_slice(&body);
3421 R::Return::deserialize(&mut cur).map_err(Into::into)
3422 }
3423
3424 async fn rpc_on_dc_raw<R: RemoteCall>(
3426 &self,
3427 dc_id: i32,
3428 req: &R,
3429 ) -> Result<Vec<u8>, InvocationError> {
3430 let needs_new = {
3432 let pool = self.inner.dc_pool.lock().await;
3433 !pool.has_connection(dc_id)
3434 };
3435
3436 if needs_new {
3437 let addr = {
3438 let opts = self.inner.dc_options.lock().await;
3439 opts.get(&dc_id).map(|e| e.addr.clone())
3440 .ok_or_else(|| InvocationError::Deserialize(format!("unknown DC{dc_id}")))?
3441 };
3442
3443 let socks5 = self.inner.socks5.clone();
3444 let transport = self.inner.transport.clone();
3445 let saved_key = {
3446 let opts = self.inner.dc_options.lock().await;
3447 opts.get(&dc_id).and_then(|e| e.auth_key)
3448 };
3449
3450 let dc_conn = if let Some(key) = saved_key {
3451 dc_pool::DcConnection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
3452 } else {
3453 let conn = dc_pool::DcConnection::connect_raw(&addr, socks5.as_ref(), &transport).await?;
3454 let home_dc_id = *self.inner.home_dc_id.lock().await;
3456 if dc_id != home_dc_id {
3457 if let Err(e) = self.export_import_auth(dc_id, &conn).await {
3458 log::warn!("[layer] Auth export/import for DC{dc_id} failed: {e}");
3459 }
3460 }
3461 conn
3462 };
3463
3464 let key = dc_conn.auth_key_bytes();
3465 {
3466 let mut opts = self.inner.dc_options.lock().await;
3467 if let Some(e) = opts.get_mut(&dc_id) {
3468 e.auth_key = Some(key);
3469 }
3470 }
3471 self.inner.dc_pool.lock().await.insert(dc_id, dc_conn);
3472 }
3473
3474 let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
3475 self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, req).await
3476 }
3477
3478 async fn export_import_auth(
3480 &self,
3481 dc_id: i32,
3482 _dc_conn: &dc_pool::DcConnection, ) -> Result<(), InvocationError> {
3484 let export_req = tl::functions::auth::ExportAuthorization { dc_id };
3486 let body = self.rpc_call_raw(&export_req).await?;
3487 let mut cur = Cursor::from_slice(&body);
3488 let exported = match tl::enums::auth::ExportedAuthorization::deserialize(&mut cur)? {
3489 tl::enums::auth::ExportedAuthorization::ExportedAuthorization(e) => e,
3490 };
3491
3492 let import_req = tl::functions::auth::ImportAuthorization {
3494 id: exported.id,
3495 bytes: exported.bytes,
3496 };
3497 let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
3498 self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, &import_req).await?;
3499 log::info!("[layer] Auth exported+imported to DC{dc_id} ✓");
3500 Ok(())
3501 }
3502
3503 async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
3506 let body = self.rpc_call_raw(&tl::functions::account::GetPassword {}).await?;
3507 let mut cur = Cursor::from_slice(&body);
3508 let pw = match tl::enums::account::Password::deserialize(&mut cur)? {
3509 tl::enums::account::Password::Password(p) => p,
3510 };
3511 Ok(PasswordToken { password: pw })
3512 }
3513
3514 fn make_send_code_req(&self, phone: &str) -> tl::functions::auth::SendCode {
3515 tl::functions::auth::SendCode {
3516 phone_number: phone.to_string(),
3517 api_id: self.inner.api_id,
3518 api_hash: self.inner.api_hash.clone(),
3519 settings: tl::enums::CodeSettings::CodeSettings(
3520 tl::types::CodeSettings {
3521 allow_flashcall: false, current_number: false, allow_app_hash: false,
3522 allow_missed_call: false, allow_firebase: false, unknown_number: false,
3523 logout_tokens: None, token: None, app_sandbox: None,
3524 },
3525 ),
3526 }
3527 }
3528
3529 fn extract_user_name(user: &tl::enums::User) -> String {
3530 match user {
3531 tl::enums::User::User(u) => {
3532 format!("{} {}",
3533 u.first_name.as_deref().unwrap_or(""),
3534 u.last_name.as_deref().unwrap_or(""))
3535 .trim().to_string()
3536 }
3537 tl::enums::User::Empty(_) => "(unknown)".into(),
3538 }
3539 }
3540
3541 fn extract_password_params(
3542 algo: &tl::enums::PasswordKdfAlgo,
3543 ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
3544 match algo {
3545 tl::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
3546 Ok((&a.salt1, &a.salt2, &a.p, a.g))
3547 }
3548 _ => Err(InvocationError::Deserialize("unsupported password KDF algo".into())),
3549 }
3550 }
3551}
3552
3553pub struct DialogIter {
3557 offset_date: i32,
3558 offset_id: i32,
3559 offset_peer: tl::enums::InputPeer,
3560 done: bool,
3561 buffer: VecDeque<Dialog>,
3562 pub total: Option<i32>,
3565}
3566
3567impl DialogIter {
3568 const PAGE_SIZE: i32 = 100;
3569
3570 pub fn total(&self) -> Option<i32> { self.total }
3576
3577 pub async fn next(&mut self, client: &Client) -> Result<Option<Dialog>, InvocationError> {
3579 if let Some(d) = self.buffer.pop_front() { return Ok(Some(d)); }
3580 if self.done { return Ok(None); }
3581
3582 let req = tl::functions::messages::GetDialogs {
3583 exclude_pinned: false,
3584 folder_id: None,
3585 offset_date: self.offset_date,
3586 offset_id: self.offset_id,
3587 offset_peer: self.offset_peer.clone(),
3588 limit: Self::PAGE_SIZE,
3589 hash: 0,
3590 };
3591
3592 let (dialogs, count) = client.get_dialogs_raw_with_count(req).await?;
3593 if self.total.is_none() {
3595 self.total = count;
3596 }
3597 if dialogs.is_empty() || dialogs.len() < Self::PAGE_SIZE as usize {
3598 self.done = true;
3599 }
3600
3601 if let Some(last) = dialogs.last() {
3603 self.offset_date = last.message.as_ref().map(|m| match m {
3604 tl::enums::Message::Message(x) => x.date,
3605 tl::enums::Message::Service(x) => x.date,
3606 _ => 0,
3607 }).unwrap_or(0);
3608 self.offset_id = last.top_message();
3609 if let Some(peer) = last.peer() {
3610 self.offset_peer = client.inner.peer_cache.lock().await.peer_to_input(peer);
3611 }
3612 }
3613
3614 self.buffer.extend(dialogs);
3615 Ok(self.buffer.pop_front())
3616 }
3617}
3618
3619pub struct MessageIter {
3621 peer: tl::enums::Peer,
3622 offset_id: i32,
3623 done: bool,
3624 buffer: VecDeque<update::IncomingMessage>,
3625 pub total: Option<i32>,
3629}
3630
3631impl MessageIter {
3632 const PAGE_SIZE: i32 = 100;
3633
3634 pub fn total(&self) -> Option<i32> { self.total }
3639
3640 pub async fn next(&mut self, client: &Client) -> Result<Option<update::IncomingMessage>, InvocationError> {
3642 if let Some(m) = self.buffer.pop_front() { return Ok(Some(m)); }
3643 if self.done { return Ok(None); }
3644
3645 let input_peer = client.inner.peer_cache.lock().await.peer_to_input(&self.peer);
3646 let (page, count) = client.get_messages_with_count(input_peer, Self::PAGE_SIZE, self.offset_id).await?;
3647
3648 if self.total.is_none() { self.total = count; }
3649
3650 if page.is_empty() || page.len() < Self::PAGE_SIZE as usize {
3651 self.done = true;
3652 }
3653 if let Some(last) = page.last() {
3654 self.offset_id = last.id();
3655 }
3656
3657 self.buffer.extend(page);
3658 Ok(self.buffer.pop_front())
3659 }
3660}
3661
3662#[doc(hidden)]
3666pub fn random_i64_pub() -> i64 { random_i64() }
3667
3668#[derive(Clone)]
3672enum FrameKind {
3673 Abridged,
3674 Intermediate,
3675 #[allow(dead_code)]
3676 Full { send_seqno: u32, recv_seqno: u32 },
3677}
3678
3679
3680struct ConnectionWriter {
3685 write_half: OwnedWriteHalf,
3686 enc: EncryptedSession,
3687 frame_kind: FrameKind,
3688 pending_ack: Vec<i64>,
3692 sent_bodies: std::collections::HashMap<i64, Vec<u8>>,
3696}
3697
3698impl ConnectionWriter {
3699 fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
3700 fn first_salt(&self) -> i64 { self.enc.salt }
3701 fn time_offset(&self) -> i32 { self.enc.time_offset }
3702}
3703
3704struct Connection {
3705 stream: TcpStream,
3706 enc: EncryptedSession,
3707 frame_kind: FrameKind,
3708}
3709
3710impl Connection {
3711 async fn open_stream(
3713 addr: &str,
3714 socks5: Option<&crate::socks5::Socks5Config>,
3715 transport: &TransportKind,
3716 ) -> Result<(TcpStream, FrameKind), InvocationError> {
3717 let stream = match socks5 {
3718 Some(proxy) => proxy.connect(addr).await?,
3719 None => {
3720 let stream = TcpStream::connect(addr).await
3723 .map_err(InvocationError::Io)?;
3724
3725 {
3731 let sock = socket2::SockRef::from(&stream);
3732 let keepalive = TcpKeepalive::new()
3733 .with_time(Duration::from_secs(TCP_KEEPALIVE_IDLE_SECS))
3734 .with_interval(Duration::from_secs(TCP_KEEPALIVE_INTERVAL_SECS));
3735 #[cfg(not(target_os = "windows"))]
3736 let keepalive = keepalive.with_retries(TCP_KEEPALIVE_PROBES);
3737 sock.set_tcp_keepalive(&keepalive).ok();
3738 }
3739 stream
3740 }
3741 };
3742 Self::apply_transport_init(stream, transport).await
3743 }
3744
3745 async fn apply_transport_init(
3747 mut stream: TcpStream,
3748 transport: &TransportKind,
3749 ) -> Result<(TcpStream, FrameKind), InvocationError> {
3750 match transport {
3751 TransportKind::Abridged => {
3752 stream.write_all(&[0xef]).await?;
3753 Ok((stream, FrameKind::Abridged))
3754 }
3755 TransportKind::Intermediate => {
3756 stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
3757 Ok((stream, FrameKind::Intermediate))
3758 }
3759 TransportKind::Full => {
3760 Ok((stream, FrameKind::Full { send_seqno: 0, recv_seqno: 0 }))
3762 }
3763 TransportKind::Obfuscated { secret } => {
3764 let mut nonce = [0u8; 64];
3773 getrandom::getrandom(&mut nonce).map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
3774 let (enc_key, enc_iv, _dec_key, _dec_iv) = crate::transport_obfuscated::derive_keys(&nonce, secret.as_ref());
3776 let mut enc_cipher = crate::transport_obfuscated::ObfCipher::new(enc_key, enc_iv);
3777 let mut handshake = nonce;
3779 handshake[56] = 0xef; handshake[57] = 0xef;
3780 handshake[58] = 0xef; handshake[59] = 0xef;
3781 enc_cipher.apply(&mut handshake[56..]);
3782 stream.write_all(&handshake).await?;
3783 Ok((stream, FrameKind::Abridged))
3784 }
3785 }
3786 }
3787
3788 async fn connect_raw(
3789 addr: &str,
3790 socks5: Option<&crate::socks5::Socks5Config>,
3791 transport: &TransportKind,
3792 ) -> Result<Self, InvocationError> {
3793 log::info!("[layer] Connecting to {addr} (DH) …");
3794
3795 let addr2 = addr.to_string();
3799 let socks5_c = socks5.cloned();
3800 let transport_c = transport.clone();
3801
3802 let fut = async move {
3803 let (mut stream, frame_kind) =
3804 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
3805
3806 let mut plain = Session::new();
3807
3808 let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3809 send_frame(&mut stream, &plain.pack(&req1).to_plaintext_bytes(), &frame_kind).await?;
3810 let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
3811
3812 let (req2, s2) = auth::step2(s1, res_pq).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3813 send_frame(&mut stream, &plain.pack(&req2).to_plaintext_bytes(), &frame_kind).await?;
3814 let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
3815
3816 let (req3, s3) = auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3817 send_frame(&mut stream, &plain.pack(&req3).to_plaintext_bytes(), &frame_kind).await?;
3818 let ans: tl::enums::SetClientDhParamsAnswer = recv_frame_plain(&mut stream, &frame_kind).await?;
3819
3820 let done = auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3821 log::info!("[layer] DH complete ✓");
3822
3823 Ok::<Self, InvocationError>(Self {
3824 stream,
3825 enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
3826 frame_kind,
3827 })
3828 };
3829
3830 tokio::time::timeout(Duration::from_secs(15), fut)
3831 .await
3832 .map_err(|_| InvocationError::Deserialize(
3833 format!("DH handshake with {addr} timed out after 15 s")
3834 ))?
3835 }
3836
3837 async fn connect_with_key(
3838 addr: &str,
3839 auth_key: [u8; 256],
3840 first_salt: i64,
3841 time_offset: i32,
3842 socks5: Option<&crate::socks5::Socks5Config>,
3843 transport: &TransportKind,
3844 ) -> Result<Self, InvocationError> {
3845 let addr2 = addr.to_string();
3846 let socks5_c = socks5.cloned();
3847 let transport_c = transport.clone();
3848
3849 let fut = async move {
3850 let (stream, frame_kind) =
3851 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
3852 Ok::<Self, InvocationError>(Self {
3853 stream,
3854 enc: EncryptedSession::new(auth_key, first_salt, time_offset),
3855 frame_kind,
3856 })
3857 };
3858
3859 tokio::time::timeout(Duration::from_secs(15), fut)
3860 .await
3861 .map_err(|_| InvocationError::Deserialize(
3862 format!("connect_with_key to {addr} timed out after 15 s")
3863 ))?
3864 }
3865
3866 fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
3867
3868 fn into_writer(self) -> (ConnectionWriter, OwnedReadHalf, FrameKind) {
3870 let (read_half, write_half) = self.stream.into_split();
3871 let writer = ConnectionWriter {
3872 write_half,
3873 enc: self.enc,
3874 frame_kind: self.frame_kind.clone(),
3875 pending_ack: Vec::new(),
3876 sent_bodies: std::collections::HashMap::new(),
3877 };
3878 (writer, read_half, self.frame_kind)
3879 }
3880}
3881
3882async fn send_frame(
3886 stream: &mut TcpStream,
3887 data: &[u8],
3888 kind: &FrameKind,
3889) -> Result<(), InvocationError> {
3890 match kind {
3891 FrameKind::Abridged => send_abridged(stream, data).await,
3892 FrameKind::Intermediate => {
3893 stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
3894 stream.write_all(data).await?;
3895 Ok(())
3896 }
3897 FrameKind::Full { .. } => {
3898 stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
3904 stream.write_all(data).await?;
3905 Ok(())
3906 }
3907 }
3908}
3909
3910enum FrameOutcome {
3914 Frame(Vec<u8>),
3915 Error(InvocationError),
3916 Keepalive, }
3918
3919async fn recv_frame_with_keepalive(
3926 rh: &mut OwnedReadHalf,
3927 fk: &FrameKind,
3928 client: &Client,
3929 _ak: &[u8; 256],
3930) -> FrameOutcome {
3931 match tokio::time::timeout(Duration::from_secs(PING_DELAY_SECS), recv_frame_read(rh, fk)).await {
3932 Ok(Ok(raw)) => FrameOutcome::Frame(raw),
3933 Ok(Err(e)) => FrameOutcome::Error(e),
3934 Err(_) => {
3935 let ping_req = tl::functions::PingDelayDisconnect {
3939 ping_id: random_i64(),
3940 disconnect_delay: NO_PING_DISCONNECT,
3941 };
3942 let mut w = client.inner.writer.lock().await;
3943 let wire = w.enc.pack(&ping_req);
3944 let fk = w.frame_kind.clone();
3945 match send_frame_write(&mut w.write_half, &wire, &fk).await {
3946 Ok(()) => FrameOutcome::Keepalive,
3947 Err(e) => FrameOutcome::Error(e),
3951 }
3952 }
3953 }
3954}
3955
3956async fn send_frame_write(
3958 stream: &mut OwnedWriteHalf,
3959 data: &[u8],
3960 kind: &FrameKind,
3961) -> Result<(), InvocationError> {
3962 match kind {
3963 FrameKind::Abridged => {
3964 let words = data.len() / 4;
3965 if words < 0x7f {
3966 stream.write_all(&[words as u8]).await?;
3967 } else {
3968 let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
3969 stream.write_all(&b).await?;
3970 }
3971 stream.write_all(data).await?;
3972 Ok(())
3973 }
3974 FrameKind::Intermediate | FrameKind::Full { .. } => {
3975 stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
3976 stream.write_all(data).await?;
3977 Ok(())
3978 }
3979 }
3980}
3981
3982async fn recv_frame_read(
3984 stream: &mut OwnedReadHalf,
3985 kind: &FrameKind,
3986) -> Result<Vec<u8>, InvocationError> {
3987 match kind {
3988 FrameKind::Abridged => {
3989 let mut h = [0u8; 1];
3990 stream.read_exact(&mut h).await?;
3991 let words = if h[0] < 0x7f {
3992 h[0] as usize
3993 } else {
3994 let mut b = [0u8; 3];
3995 stream.read_exact(&mut b).await?;
3996 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
3997 };
3998 let len = words * 4;
3999 let mut buf = vec![0u8; len];
4000 stream.read_exact(&mut buf).await?;
4001 Ok(buf)
4002 }
4003 FrameKind::Intermediate | FrameKind::Full { .. } => {
4004 let mut len_buf = [0u8; 4];
4005 stream.read_exact(&mut len_buf).await?;
4006 let len = u32::from_le_bytes(len_buf) as usize;
4007 let mut buf = vec![0u8; len];
4008 stream.read_exact(&mut buf).await?;
4009 Ok(buf)
4010 }
4011 }
4012}
4013
4014
4015async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
4017 let words = data.len() / 4;
4018 if words < 0x7f {
4019 stream.write_all(&[words as u8]).await?;
4020 } else {
4021 let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
4022 stream.write_all(&b).await?;
4023 }
4024 stream.write_all(data).await?;
4025 Ok(())
4026}
4027
4028async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
4029 let mut h = [0u8; 1];
4030 stream.read_exact(&mut h).await?;
4031 let words = if h[0] < 0x7f {
4032 h[0] as usize
4033 } else {
4034 let mut b = [0u8; 3];
4035 stream.read_exact(&mut b).await?;
4036 let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
4037 if w == 1 {
4039 let mut code_buf = [0u8; 4];
4040 stream.read_exact(&mut code_buf).await?;
4041 let code = i32::from_le_bytes(code_buf);
4042 return Err(InvocationError::Rpc(RpcError::from_telegram(code, "transport error")));
4043 }
4044 w
4045 };
4046 if words == 0 || words > 0x8000 {
4049 return Err(InvocationError::Deserialize(
4050 format!("abridged: implausible word count {words} (possible transport error or framing mismatch)")
4051 ));
4052 }
4053 let mut buf = vec![0u8; words * 4];
4054 stream.read_exact(&mut buf).await?;
4055 Ok(buf)
4056}
4057
4058async fn recv_frame_plain<T: Deserializable>(
4060 stream: &mut TcpStream,
4061 _kind: &FrameKind,
4062) -> Result<T, InvocationError> {
4063 let raw = recv_abridged(stream).await?; if raw.len() < 20 {
4065 return Err(InvocationError::Deserialize("plaintext frame too short".into()));
4066 }
4067 if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
4068 return Err(InvocationError::Deserialize("expected auth_key_id=0 in plaintext".into()));
4069 }
4070 let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
4071 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
4072 T::deserialize(&mut cur).map_err(Into::into)
4073}
4074
4075enum EnvelopeResult {
4078 Payload(Vec<u8>),
4079 Updates(Vec<update::Update>),
4080 None,
4081}
4082
4083fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
4084 if body.len() < 4 {
4085 return Err(InvocationError::Deserialize("body < 4 bytes".into()));
4086 }
4087 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
4088
4089 match cid {
4090 ID_RPC_RESULT => {
4091 if body.len() < 12 {
4092 return Err(InvocationError::Deserialize("rpc_result too short".into()));
4093 }
4094 unwrap_envelope(body[12..].to_vec())
4095 }
4096 ID_RPC_ERROR => {
4097 if body.len() < 8 {
4098 return Err(InvocationError::Deserialize("rpc_error too short".into()));
4099 }
4100 let code = i32::from_le_bytes(body[4..8].try_into().unwrap());
4101 let message = tl_read_string(&body[8..]).unwrap_or_default();
4102 Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
4103 }
4104 ID_MSG_CONTAINER => {
4105 if body.len() < 8 {
4106 return Err(InvocationError::Deserialize("container too short".into()));
4107 }
4108 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
4109 let mut pos = 8usize;
4110 let mut payload: Option<Vec<u8>> = None;
4111 let mut updates_buf: Vec<update::Update> = Vec::new();
4112
4113 for _ in 0..count {
4114 if pos + 16 > body.len() { break; }
4115 let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
4116 pos += 16;
4117 if pos + inner_len > body.len() { break; }
4118 let inner = body[pos..pos + inner_len].to_vec();
4119 pos += inner_len;
4120 match unwrap_envelope(inner)? {
4121 EnvelopeResult::Payload(p) => { payload = Some(p); }
4122 EnvelopeResult::Updates(us) => { updates_buf.extend(us); }
4123 EnvelopeResult::None => {}
4124 }
4125 }
4126 if let Some(p) = payload {
4127 Ok(EnvelopeResult::Payload(p))
4128 } else if !updates_buf.is_empty() {
4129 Ok(EnvelopeResult::Updates(updates_buf))
4130 } else {
4131 Ok(EnvelopeResult::None)
4132 }
4133 }
4134 ID_GZIP_PACKED => {
4135 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
4136 unwrap_envelope(gz_inflate(&bytes)?)
4137 }
4138 ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY
4143 | 0xd33b5459 | 0x04deb57d | 0x8cc0d131 | 0x276d3ec6 | 0x809db6df | 0x7d861a08 | 0x0949d9dc | 0xae500895 | 0x9299359f | 0xe22045fc | 0x62d350c9 => {
4156 Ok(EnvelopeResult::None)
4157 }
4158 ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
4159 | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
4160 | ID_UPDATES_TOO_LONG => {
4161 Ok(EnvelopeResult::Updates(update::parse_updates(&body)))
4162 }
4163 _ => Ok(EnvelopeResult::Payload(body)),
4164 }
4165}
4166
4167fn random_i64() -> i64 {
4170 let mut b = [0u8; 8];
4171 getrandom::getrandom(&mut b).expect("getrandom");
4172 i64::from_le_bytes(b)
4173}
4174
4175fn jitter_delay(base_ms: u64) -> Duration {
4179 let mut b = [0u8; 2];
4181 getrandom::getrandom(&mut b).unwrap_or(());
4182 let rand_frac = u16::from_le_bytes(b) as f64 / 65535.0; let factor = 0.80 + rand_frac * 0.40; Duration::from_millis((base_ms as f64 * factor) as u64)
4185}
4186
4187fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
4188 if data.is_empty() { return Some(vec![]); }
4189 let (len, start) = if data[0] < 254 { (data[0] as usize, 1) }
4190 else if data.len() >= 4 {
4191 (data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16, 4)
4192 } else { return None; };
4193 if data.len() < start + len { return None; }
4194 Some(data[start..start + len].to_vec())
4195}
4196
4197fn tl_read_string(data: &[u8]) -> Option<String> {
4198 tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
4199}
4200
4201fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
4202 use std::io::Read;
4203 let mut out = Vec::new();
4204 if flate2::read::GzDecoder::new(data).read_to_end(&mut out).is_ok() && !out.is_empty() {
4205 return Ok(out);
4206 }
4207 out.clear();
4208 flate2::read::ZlibDecoder::new(data)
4209 .read_to_end(&mut out)
4210 .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
4211 Ok(out)
4212}
4213
4214const COMPRESSION_THRESHOLD: usize = 512;
4219
4220fn tl_write_bytes(data: &[u8]) -> Vec<u8> {
4222 let len = data.len();
4223 let mut out = Vec::with_capacity(4 + len);
4224 if len < 254 {
4225 out.push(len as u8);
4226 out.extend_from_slice(data);
4227 let pad = (4 - (1 + len) % 4) % 4;
4228 out.extend(std::iter::repeat(0u8).take(pad));
4229 } else {
4230 out.push(0xfe);
4231 out.extend_from_slice(&(len as u32).to_le_bytes()[..3]);
4232 out.extend_from_slice(data);
4233 let pad = (4 - (4 + len) % 4) % 4;
4234 out.extend(std::iter::repeat(0u8).take(pad));
4235 }
4236 out
4237}
4238
4239fn gz_pack_body(data: &[u8]) -> Vec<u8> {
4241 use std::io::Write;
4242 let mut enc = flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default());
4243 let _ = enc.write_all(data);
4244 let compressed = enc.finish().unwrap_or_default();
4245 let mut out = Vec::with_capacity(4 + 4 + compressed.len());
4246 out.extend_from_slice(&ID_GZIP_PACKED.to_le_bytes());
4247 out.extend(tl_write_bytes(&compressed));
4248 out
4249}
4250
4251fn maybe_gz_pack(data: &[u8]) -> Vec<u8> {
4254 if data.len() <= COMPRESSION_THRESHOLD {
4255 return data.to_vec();
4256 }
4257 let packed = gz_pack_body(data);
4258 if packed.len() < data.len() { packed } else { data.to_vec() }
4259}
4260
4261fn build_msgs_ack_body(msg_ids: &[i64]) -> Vec<u8> {
4265 let mut out = Vec::with_capacity(4 + 4 + 4 + msg_ids.len() * 8);
4268 out.extend_from_slice(&ID_MSGS_ACK.to_le_bytes());
4269 out.extend_from_slice(&0x1cb5c415_u32.to_le_bytes()); out.extend_from_slice(&(msg_ids.len() as u32).to_le_bytes());
4271 for &id in msg_ids {
4272 out.extend_from_slice(&id.to_le_bytes());
4273 }
4274 out
4275}
4276
4277fn build_container_body(messages: &[(i64, i32, &[u8])]) -> Vec<u8> {
4285 let total_body: usize = messages.iter().map(|(_, _, b)| 16 + b.len()).sum();
4286 let mut out = Vec::with_capacity(8 + total_body);
4287 out.extend_from_slice(&ID_MSG_CONTAINER.to_le_bytes());
4288 out.extend_from_slice(&(messages.len() as u32).to_le_bytes());
4289 for &(msg_id, seqno, body) in messages {
4290 out.extend_from_slice(&msg_id.to_le_bytes());
4291 out.extend_from_slice(&seqno.to_le_bytes());
4292 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
4293 out.extend_from_slice(body);
4294 }
4295 out
4296}