1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc(html_root_url = "https://docs.rs/layer-client/0.4.9")]
3#![deny(unsafe_code)]
23
24pub mod builder;
25mod errors;
26pub mod media;
27pub mod parsers;
28pub mod participants;
29pub mod pts;
30mod restart;
31mod retry;
32mod session;
33mod transport;
34mod two_factor_auth;
35pub mod update;
36
37pub mod dc_pool;
38pub mod inline_iter;
39pub mod keyboard;
40pub mod search;
41pub mod session_backend;
42pub mod socks5;
43pub mod transport_intermediate;
44pub mod transport_obfuscated;
45pub mod types;
46pub mod typing_guard;
47
48#[macro_use]
49pub mod macros;
50pub mod peer_ref;
51pub mod reactions;
52
53#[cfg(test)]
54mod pts_tests;
55
56pub mod dc_migration;
57pub mod proxy;
58
59pub use builder::{BuilderError, ClientBuilder};
60pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
61pub use keyboard::{Button, InlineKeyboard, ReplyKeyboard};
62pub use media::{Document, DownloadIter, Downloadable, Photo, Sticker, UploadedFile};
63pub use participants::{Participant, ProfilePhotoIter};
64pub use peer_ref::PeerRef;
65pub use proxy::{MtProxyConfig, parse_proxy_link};
66pub use restart::{ConnectionRestartPolicy, FixedInterval, NeverRestart};
67use retry::RetryLoop;
68pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
69pub use search::{GlobalSearchBuilder, SearchBuilder};
70pub use session::{DcEntry, DcFlags};
71#[cfg(feature = "libsql-session")]
72#[cfg_attr(docsrs, doc(cfg(feature = "libsql-session")))]
73pub use session_backend::LibSqlBackend;
74#[cfg(feature = "sqlite-session")]
75#[cfg_attr(docsrs, doc(cfg(feature = "sqlite-session")))]
76pub use session_backend::SqliteBackend;
77pub use session_backend::{
78 BinaryFileBackend, InMemoryBackend, SessionBackend, StringSessionBackend, UpdateStateChange,
79};
80pub use socks5::Socks5Config;
81pub use types::ChannelKind;
82pub use types::{Channel, Chat, Group, User};
83pub use typing_guard::TypingGuard;
84pub use update::Update;
85pub use update::{ChatActionUpdate, UserStatusUpdate};
86
87pub use layer_tl_types as tl;
90
91use std::collections::HashMap;
92use std::collections::VecDeque;
93use std::num::NonZeroU32;
94use std::ops::ControlFlow;
95use std::sync::Arc;
96use std::time::Duration;
97
98use layer_mtproto::{EncryptedSession, Session, authentication as auth};
99use layer_tl_types::{Cursor, Deserializable, RemoteCall};
100use session::PersistedSession;
101use socket2::TcpKeepalive;
102use tokio::io::{AsyncReadExt, AsyncWriteExt};
103use tokio::net::TcpStream;
104use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
105use tokio::sync::{Mutex, RwLock, mpsc, oneshot};
106use tokio::time::sleep;
107use tokio_util::sync::CancellationToken;
108
109const ID_RPC_RESULT: u32 = 0xf35c6d01;
112const ID_RPC_ERROR: u32 = 0x2144ca19;
113const ID_MSG_CONTAINER: u32 = 0x73f1f8dc;
114const ID_GZIP_PACKED: u32 = 0x3072cfa1;
115const ID_PONG: u32 = 0x347773c5;
116const ID_MSGS_ACK: u32 = 0x62d6b459;
117const ID_BAD_SERVER_SALT: u32 = 0xedab447b;
118const ID_NEW_SESSION: u32 = 0x9ec20908;
119const ID_BAD_MSG_NOTIFY: u32 = 0xa7eff811;
120const ID_FUTURE_SALTS: u32 = 0xae500895;
122const ID_MSG_DETAILED_INFO: u32 = 0x276d3ec6;
124const ID_MSG_NEW_DETAIL_INFO: u32 = 0x809db6df;
125const ID_MSG_RESEND_REQ: u32 = 0x7d861a08;
127const ID_UPDATES: u32 = 0x74ae4240;
128const ID_UPDATE_SHORT: u32 = 0x78d4dec1;
129const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
130const ID_UPDATE_SHORT_MSG: u32 = 0x313bc7f8;
131const ID_UPDATE_SHORT_CHAT_MSG: u32 = 0x4d6deea5;
132const ID_UPDATE_SHORT_SENT_MSG: u32 = 0x9015e101;
133const ID_UPDATES_TOO_LONG: u32 = 0xe317af7e;
134
135const PING_DELAY_SECS: u64 = 60;
141
142const NO_PING_DISCONNECT: i32 = 75;
146
147const RECONNECT_BASE_MS: u64 = 500;
149
150const RECONNECT_MAX_SECS: u64 = 5;
155
156const TCP_KEEPALIVE_IDLE_SECS: u64 = 10;
158const TCP_KEEPALIVE_INTERVAL_SECS: u64 = 5;
160const TCP_KEEPALIVE_PROBES: u32 = 3;
162
163#[derive(Default)]
171pub struct PeerCache {
172 pub users: HashMap<i64, i64>,
174 pub channels: HashMap<i64, i64>,
176}
177
178impl PeerCache {
179 fn cache_user(&mut self, user: &tl::enums::User) {
180 if let tl::enums::User::User(u) = user
181 && let Some(hash) = u.access_hash
182 {
183 self.users.insert(u.id, hash);
184 }
185 }
186
187 fn cache_chat(&mut self, chat: &tl::enums::Chat) {
188 match chat {
189 tl::enums::Chat::Channel(c) => {
190 if let Some(hash) = c.access_hash {
191 self.channels.insert(c.id, hash);
192 }
193 }
194 tl::enums::Chat::ChannelForbidden(c) => {
195 self.channels.insert(c.id, c.access_hash);
196 }
197 _ => {}
198 }
199 }
200
201 fn cache_users(&mut self, users: &[tl::enums::User]) {
202 for u in users {
203 self.cache_user(u);
204 }
205 }
206
207 fn cache_chats(&mut self, chats: &[tl::enums::Chat]) {
208 for c in chats {
209 self.cache_chat(c);
210 }
211 }
212
213 fn user_input_peer(&self, user_id: i64) -> tl::enums::InputPeer {
214 if user_id == 0 {
215 return tl::enums::InputPeer::PeerSelf;
216 }
217 let hash = self.users.get(&user_id).copied().unwrap_or_else(|| {
218 tracing::warn!("[layer] PeerCache: no access_hash for user {user_id}, using 0: may cause USER_ID_INVALID");
219 0
220 });
221 tl::enums::InputPeer::User(tl::types::InputPeerUser {
222 user_id,
223 access_hash: hash,
224 })
225 }
226
227 fn channel_input_peer(&self, channel_id: i64) -> tl::enums::InputPeer {
228 let hash = self.channels.get(&channel_id).copied().unwrap_or_else(|| {
229 tracing::warn!("[layer] PeerCache: no access_hash for channel {channel_id}, using 0: may cause CHANNEL_INVALID");
230 0
231 });
232 tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
233 channel_id,
234 access_hash: hash,
235 })
236 }
237
238 fn peer_to_input(&self, peer: &tl::enums::Peer) -> tl::enums::InputPeer {
239 match peer {
240 tl::enums::Peer::User(u) => self.user_input_peer(u.user_id),
241 tl::enums::Peer::Chat(c) => {
242 tl::enums::InputPeer::Chat(tl::types::InputPeerChat { chat_id: c.chat_id })
243 }
244 tl::enums::Peer::Channel(c) => self.channel_input_peer(c.channel_id),
245 }
246 }
247}
248
249#[derive(Clone, Default)]
271pub struct InputMessage {
272 pub text: String,
273 pub reply_to: Option<i32>,
274 pub silent: bool,
275 pub background: bool,
276 pub clear_draft: bool,
277 pub no_webpage: bool,
278 pub invert_media: bool,
280 pub schedule_once_online: bool,
282 pub entities: Option<Vec<tl::enums::MessageEntity>>,
283 pub reply_markup: Option<tl::enums::ReplyMarkup>,
284 pub schedule_date: Option<i32>,
285 pub media: Option<tl::enums::InputMedia>,
288}
289
290impl InputMessage {
291 pub fn text(text: impl Into<String>) -> Self {
293 Self {
294 text: text.into(),
295 ..Default::default()
296 }
297 }
298
299 pub fn markdown(text: impl AsRef<str>) -> Self {
312 let (plain, ents) = crate::parsers::parse_markdown(text.as_ref());
313 Self {
314 text: plain,
315 entities: if ents.is_empty() { None } else { Some(ents) },
316 ..Default::default()
317 }
318 }
319
320 pub fn html(text: impl AsRef<str>) -> Self {
331 let (plain, ents) = crate::parsers::parse_html(text.as_ref());
332 Self {
333 text: plain,
334 entities: if ents.is_empty() { None } else { Some(ents) },
335 ..Default::default()
336 }
337 }
338
339 pub fn set_text(mut self, text: impl Into<String>) -> Self {
341 self.text = text.into();
342 self
343 }
344
345 pub fn reply_to(mut self, id: Option<i32>) -> Self {
347 self.reply_to = id;
348 self
349 }
350
351 pub fn silent(mut self, v: bool) -> Self {
353 self.silent = v;
354 self
355 }
356
357 pub fn background(mut self, v: bool) -> Self {
359 self.background = v;
360 self
361 }
362
363 pub fn clear_draft(mut self, v: bool) -> Self {
365 self.clear_draft = v;
366 self
367 }
368
369 pub fn no_webpage(mut self, v: bool) -> Self {
371 self.no_webpage = v;
372 self
373 }
374
375 pub fn invert_media(mut self, v: bool) -> Self {
377 self.invert_media = v;
378 self
379 }
380
381 pub fn schedule_once_online(mut self) -> Self {
386 self.schedule_once_online = true;
387 self.schedule_date = None;
388 self
389 }
390
391 pub fn entities(mut self, e: Vec<tl::enums::MessageEntity>) -> Self {
393 self.entities = Some(e);
394 self
395 }
396
397 pub fn reply_markup(mut self, rm: tl::enums::ReplyMarkup) -> Self {
399 self.reply_markup = Some(rm);
400 self
401 }
402
403 pub fn keyboard(mut self, kb: impl Into<tl::enums::ReplyMarkup>) -> Self {
413 self.reply_markup = Some(kb.into());
414 self
415 }
416
417 pub fn schedule_date(mut self, ts: Option<i32>) -> Self {
419 self.schedule_date = ts;
420 self
421 }
422
423 pub fn copy_media(mut self, media: tl::enums::InputMedia) -> Self {
439 self.media = Some(media);
440 self
441 }
442
443 pub fn clear_media(mut self) -> Self {
445 self.media = None;
446 self
447 }
448
449 fn reply_header(&self) -> Option<tl::enums::InputReplyTo> {
450 self.reply_to.map(|id| {
451 tl::enums::InputReplyTo::Message(tl::types::InputReplyToMessage {
452 reply_to_msg_id: id,
453 top_msg_id: None,
454 reply_to_peer_id: None,
455 quote_text: None,
456 quote_entities: None,
457 quote_offset: None,
458 monoforum_peer_id: None,
459 todo_item_id: None,
460 poll_option: None,
461 })
462 })
463 }
464}
465
466impl From<&str> for InputMessage {
467 fn from(s: &str) -> Self {
468 Self::text(s)
469 }
470}
471
472impl From<String> for InputMessage {
473 fn from(s: String) -> Self {
474 Self::text(s)
475 }
476}
477
478#[derive(Clone, Debug)]
491pub enum TransportKind {
492 Abridged,
496 Intermediate,
500 Full,
504 Obfuscated { secret: Option<[u8; 16]> },
512 PaddedIntermediate { secret: Option<[u8; 16]> },
518 FakeTls { secret: [u8; 16], domain: String },
525}
526
527impl Default for TransportKind {
528 fn default() -> Self {
529 TransportKind::Obfuscated { secret: None }
535 }
536}
537
538pub type ShutdownToken = CancellationToken;
558
559#[derive(Clone)]
561pub struct Config {
562 pub api_id: i32,
563 pub api_hash: String,
564 pub dc_addr: Option<String>,
565 pub retry_policy: Arc<dyn RetryPolicy>,
566 pub socks5: Option<crate::socks5::Socks5Config>,
568 pub mtproxy: Option<crate::proxy::MtProxyConfig>,
572 pub allow_ipv6: bool,
574 pub transport: TransportKind,
576 pub session_backend: Arc<dyn crate::session_backend::SessionBackend>,
578 pub catch_up: bool,
582 pub restart_policy: Arc<dyn ConnectionRestartPolicy>,
583 pub device_model: String,
585 pub system_version: String,
587 pub app_version: String,
589 pub system_lang_code: String,
591 pub lang_pack: String,
593 pub lang_code: String,
595}
596
597impl Config {
598 pub fn with_string_session(s: impl Into<String>) -> Self {
613 Config {
614 session_backend: Arc::new(crate::session_backend::StringSessionBackend::new(s)),
615 ..Config::default()
616 }
617 }
618
619 pub fn proxy_link(mut self, url: &str) -> Self {
637 if url.is_empty() {
638 return self;
639 }
640 let cfg = crate::proxy::parse_proxy_link(url)
641 .unwrap_or_else(|| panic!("invalid MTProxy link: {url:?}"));
642 self.mtproxy = Some(cfg);
643 self
644 }
645
646 pub fn proxy(self, host: impl Into<String>, port: u16, secret: &str) -> Self {
663 let host = host.into();
664 let url = format!("tg://proxy?server={host}&port={port}&secret={secret}");
665 self.proxy_link(&url)
666 }
667
668 pub fn socks5(mut self, addr: impl Into<String>) -> Self {
681 self.socks5 = Some(crate::socks5::Socks5Config::new(addr));
682 self
683 }
684
685 pub fn socks5_auth(
698 mut self,
699 addr: impl Into<String>,
700 username: impl Into<String>,
701 password: impl Into<String>,
702 ) -> Self {
703 self.socks5 = Some(crate::socks5::Socks5Config::with_auth(
704 addr, username, password,
705 ));
706 self
707 }
708}
709
710impl Default for Config {
711 fn default() -> Self {
712 Self {
713 api_id: 0,
714 api_hash: String::new(),
715 dc_addr: None,
716 retry_policy: Arc::new(AutoSleep::default()),
717 socks5: None,
718 mtproxy: None,
719 allow_ipv6: false,
720 transport: TransportKind::Obfuscated { secret: None },
721 session_backend: Arc::new(crate::session_backend::BinaryFileBackend::new(
722 "layer.session",
723 )),
724 catch_up: false,
725 restart_policy: Arc::new(NeverRestart),
726 device_model: "Linux".to_string(),
727 system_version: "1.0".to_string(),
728 app_version: env!("CARGO_PKG_VERSION").to_string(),
729 system_lang_code: "en".to_string(),
730 lang_pack: String::new(),
731 lang_code: "en".to_string(),
732 }
733 }
734}
735
736pub struct UpdateStream {
741 rx: mpsc::UnboundedReceiver<update::Update>,
742}
743
744impl UpdateStream {
745 pub async fn next(&mut self) -> Option<update::Update> {
747 self.rx.recv().await
748 }
749
750 pub async fn next_raw(&mut self) -> Option<update::RawUpdate> {
756 loop {
757 match self.rx.recv().await? {
758 update::Update::Raw(r) => return Some(r),
759 _ => continue,
760 }
761 }
762 }
763}
764
765#[derive(Debug, Clone)]
769pub struct Dialog {
770 pub raw: tl::enums::Dialog,
771 pub message: Option<tl::enums::Message>,
772 pub entity: Option<tl::enums::User>,
773 pub chat: Option<tl::enums::Chat>,
774}
775
776impl Dialog {
777 pub fn title(&self) -> String {
779 if let Some(tl::enums::User::User(u)) = &self.entity {
780 let first = u.first_name.as_deref().unwrap_or("");
781 let last = u.last_name.as_deref().unwrap_or("");
782 let name = format!("{first} {last}").trim().to_string();
783 if !name.is_empty() {
784 return name;
785 }
786 }
787 if let Some(chat) = &self.chat {
788 return match chat {
789 tl::enums::Chat::Chat(c) => c.title.clone(),
790 tl::enums::Chat::Forbidden(c) => c.title.clone(),
791 tl::enums::Chat::Channel(c) => c.title.clone(),
792 tl::enums::Chat::ChannelForbidden(c) => c.title.clone(),
793 tl::enums::Chat::Empty(_) => "(empty)".into(),
794 };
795 }
796 "(Unknown)".to_string()
797 }
798
799 pub fn peer(&self) -> Option<&tl::enums::Peer> {
801 match &self.raw {
802 tl::enums::Dialog::Dialog(d) => Some(&d.peer),
803 tl::enums::Dialog::Folder(_) => None,
804 }
805 }
806
807 pub fn unread_count(&self) -> i32 {
809 match &self.raw {
810 tl::enums::Dialog::Dialog(d) => d.unread_count,
811 _ => 0,
812 }
813 }
814
815 pub fn top_message(&self) -> i32 {
817 match &self.raw {
818 tl::enums::Dialog::Dialog(d) => d.top_message,
819 _ => 0,
820 }
821 }
822}
823
824struct ClientInner {
827 writer: Mutex<ConnectionWriter>,
830 write_half: Mutex<OwnedWriteHalf>,
834 #[allow(clippy::type_complexity)]
838 pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>>,
839 reconnect_tx: mpsc::UnboundedSender<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
842 network_hint_tx: mpsc::UnboundedSender<()>,
845 #[allow(dead_code)]
847 shutdown_token: CancellationToken,
848 #[allow(dead_code)]
850 catch_up: bool,
851 restart_policy: Arc<dyn ConnectionRestartPolicy>,
852 home_dc_id: Mutex<i32>,
853 dc_options: Mutex<HashMap<i32, DcEntry>>,
854 media_dc_options: Mutex<HashMap<i32, DcEntry>>,
856 pub peer_cache: RwLock<PeerCache>,
857 pub pts_state: Mutex<pts::PtsState>,
858 pub possible_gap: Mutex<pts::PossibleGapBuffer>,
860 api_id: i32,
861 api_hash: String,
862 device_model: String,
863 system_version: String,
864 app_version: String,
865 system_lang_code: String,
866 lang_pack: String,
867 lang_code: String,
868 retry_policy: Arc<dyn RetryPolicy>,
869 socks5: Option<crate::socks5::Socks5Config>,
870 mtproxy: Option<crate::proxy::MtProxyConfig>,
871 allow_ipv6: bool,
872 transport: TransportKind,
873 session_backend: Arc<dyn crate::session_backend::SessionBackend>,
874 dc_pool: Mutex<dc_pool::DcPool>,
875 update_tx: mpsc::Sender<update::Update>,
876 pub is_bot: std::sync::atomic::AtomicBool,
880 stream_active: std::sync::atomic::AtomicBool,
882 salt_request_in_flight: std::sync::atomic::AtomicBool,
886 dh_in_progress: std::sync::atomic::AtomicBool,
890}
891
892#[derive(Clone)]
894pub struct Client {
895 pub(crate) inner: Arc<ClientInner>,
896 _update_rx: Arc<Mutex<mpsc::Receiver<update::Update>>>,
897}
898
899impl Client {
900 pub fn builder() -> crate::builder::ClientBuilder {
915 crate::builder::ClientBuilder::default()
916 }
917
918 pub async fn connect(config: Config) -> Result<(Self, ShutdownToken), InvocationError> {
921 if config.api_id == 0 {
923 return Err(InvocationError::Deserialize(
924 "api_id must be non-zero".into(),
925 ));
926 }
927 if config.api_hash.is_empty() {
928 return Err(InvocationError::Deserialize(
929 "api_hash must not be empty".into(),
930 ));
931 }
932
933 let (update_tx, update_rx) = mpsc::channel(2048);
936
937 let socks5 = config.socks5.clone();
939 let mtproxy = config.mtproxy.clone();
940 let transport = config.transport.clone();
941
942 let (conn, home_dc_id, dc_opts, media_dc_opts, loaded_session) =
943 match config.session_backend.load().map_err(InvocationError::Io)? {
944 Some(s) => {
945 if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
946 if let Some(key) = dc.auth_key {
947 tracing::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
948 match Connection::connect_with_key(
949 &dc.addr,
950 key,
951 dc.first_salt,
952 dc.time_offset,
953 socks5.as_ref(),
954 &transport,
955 s.home_dc_id as i16,
956 )
957 .await
958 {
959 Ok(c) => {
960 let mut opts = session::default_dc_addresses()
961 .into_iter()
962 .map(|(id, addr)| {
963 (
964 id,
965 DcEntry {
966 dc_id: id,
967 addr,
968 auth_key: None,
969 first_salt: 0,
970 time_offset: 0,
971 flags: DcFlags::NONE,
972 },
973 )
974 })
975 .collect::<HashMap<_, _>>();
976 let mut media_opts: HashMap<i32, DcEntry> = HashMap::new();
977 for d in &s.dcs {
978 if d.flags.contains(DcFlags::MEDIA_ONLY)
979 || d.flags.contains(DcFlags::CDN)
980 {
981 media_opts.insert(d.dc_id, d.clone());
982 } else {
983 opts.insert(d.dc_id, d.clone());
984 }
985 }
986 (c, s.home_dc_id, opts, media_opts, Some(s))
987 }
988 Err(e) => {
989 tracing::warn!(
995 "[layer] Session connect failed ({e}): \
996 returning error (delete session file to reset)"
997 );
998 return Err(e);
999 }
1000 }
1001 } else {
1002 let (c, dc, opts) =
1003 Self::fresh_connect(socks5.as_ref(), mtproxy.as_ref(), &transport)
1004 .await?;
1005 (c, dc, opts, HashMap::new(), None)
1006 }
1007 } else {
1008 let (c, dc, opts) =
1009 Self::fresh_connect(socks5.as_ref(), mtproxy.as_ref(), &transport)
1010 .await?;
1011 (c, dc, opts, HashMap::new(), None)
1012 }
1013 }
1014 None => {
1015 let (c, dc, opts) =
1016 Self::fresh_connect(socks5.as_ref(), mtproxy.as_ref(), &transport).await?;
1017 (c, dc, opts, HashMap::new(), None)
1018 }
1019 };
1020
1021 let pool = dc_pool::DcPool::new(home_dc_id, &dc_opts.values().cloned().collect::<Vec<_>>());
1023
1024 let (writer, write_half, read_half, frame_kind) = conn.into_writer();
1029 let auth_key = writer.enc.auth_key_bytes();
1030 let session_id = writer.enc.session_id();
1031
1032 #[allow(clippy::type_complexity)]
1033 let pending: Arc<
1034 Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>,
1035 > = Arc::new(Mutex::new(HashMap::new()));
1036
1037 let (reconnect_tx, reconnect_rx) =
1039 mpsc::unbounded_channel::<(OwnedReadHalf, FrameKind, [u8; 256], i64)>();
1040
1041 let (network_hint_tx, network_hint_rx) = mpsc::unbounded_channel::<()>();
1044
1045 let shutdown_token = CancellationToken::new();
1047 let catch_up = config.catch_up;
1048 let restart_policy = config.restart_policy;
1049
1050 let inner = Arc::new(ClientInner {
1051 writer: Mutex::new(writer),
1052 write_half: Mutex::new(write_half),
1053 pending: pending.clone(),
1054 reconnect_tx,
1055 network_hint_tx,
1056 shutdown_token: shutdown_token.clone(),
1057 catch_up,
1058 restart_policy,
1059 home_dc_id: Mutex::new(home_dc_id),
1060 dc_options: Mutex::new(dc_opts),
1061 media_dc_options: Mutex::new(media_dc_opts),
1062 peer_cache: RwLock::new(PeerCache::default()),
1063 pts_state: Mutex::new(pts::PtsState::default()),
1064 possible_gap: Mutex::new(pts::PossibleGapBuffer::new()),
1065 api_id: config.api_id,
1066 api_hash: config.api_hash,
1067 device_model: config.device_model,
1068 system_version: config.system_version,
1069 app_version: config.app_version,
1070 system_lang_code: config.system_lang_code,
1071 lang_pack: config.lang_pack,
1072 lang_code: config.lang_code,
1073 retry_policy: config.retry_policy,
1074 socks5: config.socks5,
1075 mtproxy: config.mtproxy,
1076 allow_ipv6: config.allow_ipv6,
1077 transport: config.transport,
1078 session_backend: config.session_backend,
1079 dc_pool: Mutex::new(pool),
1080 update_tx,
1081 is_bot: std::sync::atomic::AtomicBool::new(false),
1082 stream_active: std::sync::atomic::AtomicBool::new(false),
1083 salt_request_in_flight: std::sync::atomic::AtomicBool::new(false),
1084 dh_in_progress: std::sync::atomic::AtomicBool::new(false),
1085 });
1086
1087 let client = Self {
1088 inner,
1089 _update_rx: Arc::new(Mutex::new(update_rx)),
1090 };
1091
1092 {
1095 let client_r = client.clone();
1096 let shutdown_r = shutdown_token.clone();
1097 tokio::spawn(async move {
1098 client_r
1099 .run_reader_task(
1100 read_half,
1101 frame_kind,
1102 auth_key,
1103 session_id,
1104 reconnect_rx,
1105 network_hint_rx,
1106 shutdown_r,
1107 )
1108 .await;
1109 });
1110 }
1111
1112 if let Err(e) = client.init_connection().await {
1119 let key_is_stale = match &e {
1120 InvocationError::Rpc(r) if r.code == -404 => true,
1121 _ => false,
1124 };
1125
1126 let dh_allowed = key_is_stale
1130 && client
1131 .inner
1132 .dh_in_progress
1133 .compare_exchange(
1134 false,
1135 true,
1136 std::sync::atomic::Ordering::SeqCst,
1137 std::sync::atomic::Ordering::SeqCst,
1138 )
1139 .is_ok();
1140
1141 if dh_allowed {
1142 tracing::warn!("[layer] init_connection: definitive bad-key ({e}), fresh DH …");
1143 {
1144 let home_dc_id = *client.inner.home_dc_id.lock().await;
1145 let mut opts = client.inner.dc_options.lock().await;
1146 if let Some(entry) = opts.get_mut(&home_dc_id)
1147 && entry.auth_key.is_some()
1148 {
1149 tracing::warn!("[layer] Clearing stale auth key for DC{home_dc_id}");
1150 entry.auth_key = None;
1151 entry.first_salt = 0;
1152 entry.time_offset = 0;
1153 }
1154 }
1155 client.save_session().await.ok();
1156 client.inner.pending.lock().await.clear();
1157
1158 let socks5_r = client.inner.socks5.clone();
1159 let mtproxy_r = client.inner.mtproxy.clone();
1160 let transport_r = client.inner.transport.clone();
1161
1162 let home_dc_id_r = *client.inner.home_dc_id.lock().await;
1166 let addr_r = {
1167 let opts = client.inner.dc_options.lock().await;
1168 opts.get(&home_dc_id_r)
1169 .map(|e| e.addr.clone())
1170 .unwrap_or_else(|| {
1171 crate::dc_migration::fallback_dc_addr(home_dc_id_r).to_string()
1172 })
1173 };
1174 let new_conn = Connection::connect_raw(
1175 &addr_r,
1176 socks5_r.as_ref(),
1177 mtproxy_r.as_ref(),
1178 &transport_r,
1179 home_dc_id_r as i16,
1180 )
1181 .await?;
1182
1183 let (new_writer, new_wh, new_read, new_fk) = new_conn.into_writer();
1185 {
1187 let mut opts_guard = client.inner.dc_options.lock().await;
1188 if let Some(entry) = opts_guard.get_mut(&home_dc_id_r) {
1189 entry.auth_key = Some(new_writer.auth_key_bytes());
1190 entry.first_salt = new_writer.first_salt();
1191 entry.time_offset = new_writer.time_offset();
1192 }
1193 }
1194 let new_ak = new_writer.enc.auth_key_bytes();
1196 let new_sid = new_writer.enc.session_id();
1197 *client.inner.writer.lock().await = new_writer;
1198 *client.inner.write_half.lock().await = new_wh;
1199 let _ = client
1200 .inner
1201 .reconnect_tx
1202 .send((new_read, new_fk, new_ak, new_sid));
1203 tokio::task::yield_now().await;
1204
1205 {
1211 let mut attempt = 0u32;
1212 const MAX_ATTEMPTS: u32 = 5;
1213 loop {
1214 match client.init_connection().await {
1215 Ok(()) => break,
1216 Err(InvocationError::Rpc(ref r))
1217 if r.code == 401 && attempt < MAX_ATTEMPTS =>
1218 {
1219 let delay =
1220 std::time::Duration::from_millis(500 * (1u64 << attempt));
1221 tracing::warn!(
1222 "[layer] init_connection AUTH_KEY_UNREGISTERED \
1223 (attempt {}/{MAX_ATTEMPTS}): key not yet propagated, \
1224 retrying in {delay:?}",
1225 attempt + 1,
1226 );
1227 tokio::time::sleep(delay).await;
1228 attempt += 1;
1229 }
1230 Err(e) => return Err(e),
1231 }
1232 }
1233 }
1234 client
1235 .inner
1236 .dh_in_progress
1237 .store(false, std::sync::atomic::Ordering::SeqCst);
1238 client.save_session().await.ok();
1240
1241 tracing::warn!(
1242 "[layer] Session invalidated and reset. \
1243 Call is_authorized() and re-authenticate if needed."
1244 );
1245 } else {
1246 return Err(e);
1247 }
1248 }
1249
1250 if let Some(ref s) = loaded_session
1262 && !s.peers.is_empty()
1263 {
1264 let mut cache = client.inner.peer_cache.write().await;
1265 for p in &s.peers {
1266 if p.is_channel {
1267 cache.channels.entry(p.id).or_insert(p.access_hash);
1268 } else {
1269 cache.users.entry(p.id).or_insert(p.access_hash);
1270 }
1271 }
1272 tracing::debug!(
1273 "[layer] Peer cache restored: {} users, {} channels",
1274 cache.users.len(),
1275 cache.channels.len()
1276 );
1277 }
1278
1279 let has_saved_state = loaded_session
1289 .as_ref()
1290 .is_some_and(|s| s.updates_state.is_initialised());
1291
1292 if catch_up && has_saved_state {
1293 let snap = &loaded_session.as_ref().unwrap().updates_state;
1294 let mut state = client.inner.pts_state.lock().await;
1295 state.pts = snap.pts;
1296 state.qts = snap.qts;
1297 state.date = snap.date;
1298 state.seq = snap.seq;
1299 for &(cid, cpts) in &snap.channels {
1300 state.channel_pts.insert(cid, cpts);
1301 }
1302 tracing::info!(
1303 "[layer] Update state restored: pts={}, qts={}, seq={}, {} channels",
1304 state.pts,
1305 state.qts,
1306 state.seq,
1307 state.channel_pts.len()
1308 );
1309 drop(state);
1310
1311 let channel_ids: Vec<i64> = snap.channels.iter().map(|&(cid, _)| cid).collect();
1315
1316 let c = client.clone();
1319 let utx = client.inner.update_tx.clone();
1320 tokio::spawn(async move {
1321 match c.get_difference().await {
1323 Ok(missed) => {
1324 tracing::info!(
1325 "[layer] catch_up: {} global updates replayed",
1326 missed.len()
1327 );
1328 for u in missed {
1329 if utx.try_send(attach_client_to_update(u, &c)).is_err() {
1330 tracing::warn!(
1331 "[layer] update channel full: dropping catch-up update"
1332 );
1333 break;
1334 }
1335 }
1336 }
1337 Err(e) => tracing::warn!("[layer] catch_up getDifference: {e}"),
1338 }
1339
1340 if !channel_ids.is_empty() {
1345 tracing::info!(
1346 "[layer] catch_up: per-channel diff for {} channels",
1347 channel_ids.len()
1348 );
1349 let sem = std::sync::Arc::new(tokio::sync::Semaphore::new(10));
1350 for channel_id in channel_ids {
1351 let c2 = c.clone();
1352 let utx2 = utx.clone();
1353 let permit = sem.clone().acquire_owned().await.unwrap();
1354 tokio::spawn(async move {
1355 let _permit = permit; match c2.get_channel_difference(channel_id).await {
1357 Ok(updates) => {
1358 if !updates.is_empty() {
1359 tracing::debug!(
1360 "[layer] catch_up channel {channel_id}: {} updates",
1361 updates.len()
1362 );
1363 }
1364 for u in updates {
1365 if utx2.try_send(u).is_err() {
1366 tracing::warn!(
1367 "[layer] update channel full: dropping channel diff update"
1368 );
1369 break;
1370 }
1371 }
1372 }
1373 Err(e) => {
1374 tracing::warn!("[layer] catch_up channel {channel_id}: {e}")
1375 }
1376 }
1377 });
1378 }
1379 }
1380 });
1381 } else {
1382 let _ = client.sync_pts_state().await;
1384 }
1385
1386 Ok((client, shutdown_token))
1387 }
1388
1389 async fn fresh_connect(
1390 socks5: Option<&crate::socks5::Socks5Config>,
1391 mtproxy: Option<&crate::proxy::MtProxyConfig>,
1392 transport: &TransportKind,
1393 ) -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
1394 tracing::debug!("[layer] Fresh connect to DC2 …");
1395 let conn = Connection::connect_raw(
1396 crate::dc_migration::fallback_dc_addr(2),
1397 socks5,
1398 mtproxy,
1399 transport,
1400 2i16,
1401 )
1402 .await?;
1403 let opts = session::default_dc_addresses()
1404 .into_iter()
1405 .map(|(id, addr)| {
1406 (
1407 id,
1408 DcEntry {
1409 dc_id: id,
1410 addr,
1411 auth_key: None,
1412 first_salt: 0,
1413 time_offset: 0,
1414 flags: DcFlags::NONE,
1415 },
1416 )
1417 })
1418 .collect();
1419 Ok((conn, 2, opts))
1420 }
1421
1422 async fn build_persisted_session(&self) -> PersistedSession {
1430 use session::{CachedPeer, UpdatesStateSnap};
1431
1432 let writer_guard = self.inner.writer.lock().await;
1433 let home_dc_id = *self.inner.home_dc_id.lock().await;
1434 let dc_options = self.inner.dc_options.lock().await;
1435
1436 let mut dcs: Vec<DcEntry> = dc_options
1437 .values()
1438 .map(|e| DcEntry {
1439 dc_id: e.dc_id,
1440 addr: e.addr.clone(),
1441 auth_key: if e.dc_id == home_dc_id {
1442 Some(writer_guard.auth_key_bytes())
1443 } else {
1444 e.auth_key
1445 },
1446 first_salt: if e.dc_id == home_dc_id {
1447 writer_guard.first_salt()
1448 } else {
1449 e.first_salt
1450 },
1451 time_offset: if e.dc_id == home_dc_id {
1452 writer_guard.time_offset()
1453 } else {
1454 e.time_offset
1455 },
1456 flags: e.flags,
1457 })
1458 .collect();
1459 {
1461 let media_opts = self.inner.media_dc_options.lock().await;
1462 for e in media_opts.values() {
1463 dcs.push(e.clone());
1464 }
1465 }
1466 self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
1467
1468 let pts_snap = {
1469 let s = self.inner.pts_state.lock().await;
1470 UpdatesStateSnap {
1471 pts: s.pts,
1472 qts: s.qts,
1473 date: s.date,
1474 seq: s.seq,
1475 channels: s.channel_pts.iter().map(|(&k, &v)| (k, v)).collect(),
1476 }
1477 };
1478
1479 let peers: Vec<CachedPeer> = {
1480 let cache = self.inner.peer_cache.read().await;
1481 let mut v = Vec::with_capacity(cache.users.len() + cache.channels.len());
1482 for (&id, &hash) in &cache.users {
1483 v.push(CachedPeer {
1484 id,
1485 access_hash: hash,
1486 is_channel: false,
1487 });
1488 }
1489 for (&id, &hash) in &cache.channels {
1490 v.push(CachedPeer {
1491 id,
1492 access_hash: hash,
1493 is_channel: true,
1494 });
1495 }
1496 v
1497 };
1498
1499 PersistedSession {
1500 home_dc_id,
1501 dcs,
1502 updates_state: pts_snap,
1503 peers,
1504 }
1505 }
1506
1507 pub async fn save_session(&self) -> Result<(), InvocationError> {
1509 let session = self.build_persisted_session().await;
1510 self.inner
1511 .session_backend
1512 .save(&session)
1513 .map_err(InvocationError::Io)?;
1514 tracing::debug!("[layer] Session saved ✓");
1515 Ok(())
1516 }
1517
1518 pub async fn export_session_string(&self) -> Result<String, InvocationError> {
1525 Ok(self.build_persisted_session().await.to_string())
1526 }
1527
1528 pub async fn media_dc_addr(&self, dc_id: i32) -> Option<String> {
1534 self.inner
1535 .media_dc_options
1536 .lock()
1537 .await
1538 .get(&dc_id)
1539 .map(|e| e.addr.clone())
1540 }
1541
1542 pub async fn best_media_dc_addr(&self) -> Option<(i32, String)> {
1545 let home = *self.inner.home_dc_id.lock().await;
1546 let media = self.inner.media_dc_options.lock().await;
1547 media
1548 .get(&home)
1549 .map(|e| (home, e.addr.clone()))
1550 .or_else(|| media.iter().next().map(|(&id, e)| (id, e.addr.clone())))
1551 }
1552
1553 pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
1555 match self.invoke(&tl::functions::updates::GetState {}).await {
1556 Ok(_) => Ok(true),
1557 Err(e)
1558 if e.is("AUTH_KEY_UNREGISTERED")
1559 || matches!(&e, InvocationError::Rpc(r) if r.code == 401) =>
1560 {
1561 Ok(false)
1562 }
1563 Err(e) => Err(e),
1564 }
1565 }
1566
1567 pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
1569 let req = tl::functions::auth::ImportBotAuthorization {
1570 flags: 0,
1571 api_id: self.inner.api_id,
1572 api_hash: self.inner.api_hash.clone(),
1573 bot_auth_token: token.to_string(),
1574 };
1575
1576 let result = self.invoke(&req).await?;
1577
1578 let name = match result {
1579 tl::enums::auth::Authorization::Authorization(a) => {
1580 self.cache_user(&a.user).await;
1581 Self::extract_user_name(&a.user)
1582 }
1583 tl::enums::auth::Authorization::SignUpRequired(_) => {
1584 return Err(InvocationError::Deserialize(
1585 "unexpected SignUpRequired during bot sign-in".into(),
1586 ));
1587 }
1588 };
1589 tracing::info!("[layer] Bot signed in ✓ ({name})");
1590 self.inner
1591 .is_bot
1592 .store(true, std::sync::atomic::Ordering::Relaxed);
1593 Ok(name)
1594 }
1595
1596 pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
1598 use tl::enums::auth::SentCode;
1599
1600 let req = self.make_send_code_req(phone);
1601 let body = self.rpc_call_raw(&req).await?;
1602
1603 let mut cur = Cursor::from_slice(&body);
1604 let hash = match tl::enums::auth::SentCode::deserialize(&mut cur)? {
1605 SentCode::SentCode(s) => s.phone_code_hash,
1606 SentCode::Success(_) => {
1607 return Err(InvocationError::Deserialize("unexpected Success".into()));
1608 }
1609 SentCode::PaymentRequired(_) => {
1610 return Err(InvocationError::Deserialize(
1611 "payment required to send code".into(),
1612 ));
1613 }
1614 };
1615 tracing::info!("[layer] Login code sent");
1616 Ok(LoginToken {
1617 phone: phone.to_string(),
1618 phone_code_hash: hash,
1619 })
1620 }
1621
1622 pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
1624 let req = tl::functions::auth::SignIn {
1625 phone_number: token.phone.clone(),
1626 phone_code_hash: token.phone_code_hash.clone(),
1627 phone_code: Some(code.trim().to_string()),
1628 email_verification: None,
1629 };
1630
1631 let body = match self.rpc_call_raw(&req).await {
1632 Ok(b) => b,
1633 Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
1634 let t = self.get_password_info().await.map_err(SignInError::Other)?;
1635 return Err(SignInError::PasswordRequired(Box::new(t)));
1636 }
1637 Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
1638 Err(e) => return Err(SignInError::Other(e)),
1639 };
1640
1641 let mut cur = Cursor::from_slice(&body);
1642 match tl::enums::auth::Authorization::deserialize(&mut cur)
1643 .map_err(|e| SignInError::Other(e.into()))?
1644 {
1645 tl::enums::auth::Authorization::Authorization(a) => {
1646 self.cache_user(&a.user).await;
1647 let name = Self::extract_user_name(&a.user);
1648 tracing::info!("[layer] Signed in ✓ Welcome, {name}!");
1649 Ok(name)
1650 }
1651 tl::enums::auth::Authorization::SignUpRequired(_) => Err(SignInError::SignUpRequired),
1652 }
1653 }
1654
1655 pub async fn check_password(
1657 &self,
1658 token: PasswordToken,
1659 password: impl AsRef<[u8]>,
1660 ) -> Result<String, InvocationError> {
1661 let pw = token.password;
1662 let algo = pw
1663 .current_algo
1664 .ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
1665 let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
1666 let g_b = pw
1667 .srp_b
1668 .ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
1669 let a = pw.secure_random;
1670 let srp_id = pw
1671 .srp_id
1672 .ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
1673
1674 let (m1, g_a) =
1675 two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
1676 let req = tl::functions::auth::CheckPassword {
1677 password: tl::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
1678 tl::types::InputCheckPasswordSrp {
1679 srp_id,
1680 a: g_a.to_vec(),
1681 m1: m1.to_vec(),
1682 },
1683 ),
1684 };
1685
1686 let body = self.rpc_call_raw(&req).await?;
1687 let mut cur = Cursor::from_slice(&body);
1688 match tl::enums::auth::Authorization::deserialize(&mut cur)? {
1689 tl::enums::auth::Authorization::Authorization(a) => {
1690 self.cache_user(&a.user).await;
1691 let name = Self::extract_user_name(&a.user);
1692 tracing::info!("[layer] 2FA ✓ Welcome, {name}!");
1693 Ok(name)
1694 }
1695 tl::enums::auth::Authorization::SignUpRequired(_) => Err(InvocationError::Deserialize(
1696 "unexpected SignUpRequired after 2FA".into(),
1697 )),
1698 }
1699 }
1700
1701 pub async fn sign_out(&self) -> Result<bool, InvocationError> {
1703 let req = tl::functions::auth::LogOut {};
1704 match self.rpc_call_raw(&req).await {
1705 Ok(_) => {
1706 tracing::info!("[layer] Signed out ✓");
1707 Ok(true)
1708 }
1709 Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
1710 Err(e) => Err(e),
1711 }
1712 }
1713
1714 pub async fn get_users_by_id(
1722 &self,
1723 ids: &[i64],
1724 ) -> Result<Vec<Option<crate::types::User>>, InvocationError> {
1725 let cache = self.inner.peer_cache.read().await;
1726 let input_ids: Vec<tl::enums::InputUser> = ids
1727 .iter()
1728 .map(|&id| {
1729 if id == 0 {
1730 tl::enums::InputUser::UserSelf
1731 } else {
1732 let hash = cache.users.get(&id).copied().unwrap_or(0);
1733 tl::enums::InputUser::InputUser(tl::types::InputUser {
1734 user_id: id,
1735 access_hash: hash,
1736 })
1737 }
1738 })
1739 .collect();
1740 drop(cache);
1741 let req = tl::functions::users::GetUsers { id: input_ids };
1742 let body = self.rpc_call_raw(&req).await?;
1743 let mut cur = Cursor::from_slice(&body);
1744 let users = Vec::<tl::enums::User>::deserialize(&mut cur)?;
1745 self.cache_users_slice(&users).await;
1746 Ok(users
1747 .into_iter()
1748 .map(crate::types::User::from_raw)
1749 .collect())
1750 }
1751
1752 pub async fn get_me(&self) -> Result<tl::types::User, InvocationError> {
1754 let req = tl::functions::users::GetUsers {
1755 id: vec![tl::enums::InputUser::UserSelf],
1756 };
1757 let body = self.rpc_call_raw(&req).await?;
1758 let mut cur = Cursor::from_slice(&body);
1759 let users = Vec::<tl::enums::User>::deserialize(&mut cur)?;
1760 self.cache_users_slice(&users).await;
1761 users
1762 .into_iter()
1763 .find_map(|u| match u {
1764 tl::enums::User::User(u) => Some(u),
1765 _ => None,
1766 })
1767 .ok_or_else(|| InvocationError::Deserialize("getUsers returned no user".into()))
1768 }
1769
1770 pub fn stream_updates(&self) -> UpdateStream {
1778 if self
1782 .inner
1783 .stream_active
1784 .swap(true, std::sync::atomic::Ordering::SeqCst)
1785 {
1786 panic!(
1787 "stream_updates() called twice on the same Client: only one UpdateStream is supported per client"
1788 );
1789 }
1790 let (caller_tx, rx) = mpsc::unbounded_channel::<update::Update>();
1791 let internal_rx = self._update_rx.clone();
1792 tokio::spawn(async move {
1793 let mut guard = internal_rx.lock().await;
1794 while let Some(upd) = guard.recv().await {
1795 if caller_tx.send(upd).is_err() {
1796 break;
1797 }
1798 }
1799 });
1800 UpdateStream { rx }
1801 }
1802
1803 pub fn signal_network_restored(&self) {
1816 let _ = self.inner.network_hint_tx.send(());
1817 }
1818
1819 #[allow(clippy::too_many_arguments)]
1853 async fn run_reader_task(
1854 &self,
1855 read_half: OwnedReadHalf,
1856 frame_kind: FrameKind,
1857 auth_key: [u8; 256],
1858 session_id: i64,
1859 mut new_conn_rx: mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1860 mut network_hint_rx: mpsc::UnboundedReceiver<()>,
1861 shutdown_token: CancellationToken,
1862 ) {
1863 let mut rh = read_half;
1864 let mut fk = frame_kind;
1865 let mut ak = auth_key;
1866 let mut sid = session_id;
1867 let mut restart_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = None;
1870 let mut restart_count: u32 = 0;
1871
1872 loop {
1873 tokio::select! {
1874 _ = shutdown_token.cancelled() => {
1876 tracing::info!("[layer] Reader task: shutdown requested, exiting cleanly.");
1877 let mut pending = self.inner.pending.lock().await;
1878 for (_, tx) in pending.drain() {
1879 let _ = tx.send(Err(InvocationError::Dropped));
1880 }
1881 return;
1882 }
1883
1884 _ = self.reader_loop(
1886 rh, fk, ak, sid,
1887 restart_init_rx.take(),
1888 &mut new_conn_rx, &mut network_hint_rx,
1889 ) => {}
1890 }
1891
1892 if shutdown_token.is_cancelled() {
1895 tracing::debug!("[layer] Reader task: exiting after loop (shutdown).");
1896 return;
1897 }
1898
1899 restart_count += 1;
1900 tracing::error!(
1901 "[layer] Reader loop exited unexpectedly (restart #{restart_count}): supervisor reconnecting …"
1902 );
1903
1904 {
1906 let mut pending = self.inner.pending.lock().await;
1907 for (_, tx) in pending.drain() {
1908 let _ = tx.send(Err(InvocationError::Io(std::io::Error::new(
1909 std::io::ErrorKind::ConnectionReset,
1910 "reader task restarted",
1911 ))));
1912 }
1913 }
1914 self.inner.writer.lock().await.sent_bodies.clear();
1916
1917 let mut delay_ms = RECONNECT_BASE_MS;
1919 let new_conn = loop {
1920 tracing::debug!("[layer] Supervisor: reconnecting in {delay_ms} ms …");
1921 tokio::select! {
1922 _ = shutdown_token.cancelled() => {
1923 tracing::debug!("[layer] Supervisor: shutdown during reconnect, exiting.");
1924 return;
1925 }
1926 _ = sleep(Duration::from_millis(delay_ms)) => {}
1927 }
1928
1929 let dummy_ak = [0u8; 256];
1934 let dummy_fk = FrameKind::Abridged;
1935 match self.do_reconnect(&dummy_ak, &dummy_fk).await {
1936 Ok(conn) => break conn,
1937 Err(e) => {
1938 tracing::warn!("[layer] Supervisor: reconnect failed ({e})");
1939 let next = (delay_ms * 2).min(RECONNECT_MAX_SECS * 1_000);
1940 delay_ms = jitter_delay(next).as_millis() as u64;
1941 }
1942 }
1943 };
1944
1945 let (new_rh, new_fk, new_ak, new_sid) = new_conn;
1946 rh = new_rh;
1947 fk = new_fk;
1948 ak = new_ak;
1949 sid = new_sid;
1950
1951 let (init_tx, init_rx) = oneshot::channel();
1954 let c = self.clone();
1955 let utx = self.inner.update_tx.clone();
1956 tokio::spawn(async move {
1957 let result = loop {
1959 match c.init_connection().await {
1960 Ok(()) => break Ok(()),
1961 Err(InvocationError::Rpc(ref r)) if r.flood_wait_seconds().is_some() => {
1962 let secs = r.flood_wait_seconds().unwrap();
1963 tracing::warn!(
1964 "[layer] Supervisor init_connection FLOOD_WAIT_{secs}: waiting"
1965 );
1966 sleep(Duration::from_secs(secs + 1)).await;
1967 }
1968 Err(e) => break Err(e),
1969 }
1970 };
1971 if result.is_ok() {
1972 let missed = {
1978 let mut attempt = 0u32;
1979 const MAX_ATTEMPTS: u32 = 5;
1980 loop {
1981 match c.get_difference().await {
1982 Ok(updates) => break updates,
1983 Err(ref e)
1984 if matches!(e,
1985 InvocationError::Rpc(r) if r.code == 401)
1986 && attempt < MAX_ATTEMPTS =>
1987 {
1988 let delay = Duration::from_millis(500 * (1u64 << attempt));
1989 tracing::warn!(
1990 "[layer] getDifference AUTH_KEY_UNREGISTERED \
1991 (attempt {}/{MAX_ATTEMPTS}): retrying in {delay:?}",
1992 attempt + 1,
1993 );
1994 sleep(delay).await;
1995 attempt += 1;
1996 }
1997 Err(ref e)
1998 if matches!(e,
1999 InvocationError::Rpc(r) if r.code == 401) =>
2000 {
2001 tracing::warn!(
2002 "[layer] getDifference AUTH_KEY_UNREGISTERED \
2003 after {MAX_ATTEMPTS} retries: falling back to \
2004 sync_pts_state"
2005 );
2006 let _ = c.sync_pts_state().await;
2007 break vec![];
2008 }
2009 Err(e) => {
2010 tracing::warn!(
2011 "[layer] getDifference failed after reconnect: {e}"
2012 );
2013 break vec![];
2014 }
2015 }
2016 }
2017 };
2018 for u in missed {
2019 if utx.try_send(u).is_err() {
2020 tracing::warn!("[layer] update channel full: dropping catch-up update");
2021 break;
2022 }
2023 }
2024 }
2025 let _ = init_tx.send(result);
2026 });
2027 restart_init_rx = Some(init_rx);
2028
2029 tracing::debug!(
2030 "[layer] Supervisor: restarting reader loop (restart #{restart_count}) …"
2031 );
2032 }
2034 }
2035
2036 #[allow(clippy::too_many_arguments)]
2037 async fn reader_loop(
2038 &self,
2039 mut rh: OwnedReadHalf,
2040 mut fk: FrameKind,
2041 mut ak: [u8; 256],
2042 mut sid: i64,
2043 initial_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>>,
2046 new_conn_rx: &mut mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
2047 network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
2048 ) {
2049 let mut init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = initial_init_rx;
2054 let mut init_fail_count: u32 = 0;
2059
2060 let mut gap_tick = tokio::time::interval(std::time::Duration::from_millis(1500));
2061 gap_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2062
2063 let mut restart_interval = self.inner.restart_policy.restart_interval().map(|d| {
2064 let mut i = tokio::time::interval(d);
2065 i.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2066 i
2067 });
2068 if let Some(ref mut i) = restart_interval {
2069 i.tick().await;
2070 }
2071
2072 loop {
2073 tokio::select! {
2074 _ = gap_tick.tick() => {
2078 if self.inner.possible_gap.lock().await.has_global() {
2083 let gap_expired = self.inner.possible_gap.lock().await.global_deadline_elapsed();
2084 if gap_expired {
2085 let c = self.clone();
2086 tokio::spawn(async move {
2087 if let Err(e) = c.check_update_deadline().await {
2088 tracing::warn!("[layer] gap tick getDifference: {e}");
2089 }
2090 });
2091 }
2092 }
2093 }
2094 _ = async {
2095 if let Some(ref mut i) = restart_interval { i.tick().await; }
2096 else { std::future::pending::<()>().await; }
2097 } => {
2098 tracing::info!("[layer] scheduled restart: reconnecting");
2099 let _ = self.inner.write_half.lock().await.shutdown().await;
2100 let _ = self.inner.network_hint_tx.send(());
2101 }
2102 outcome = recv_frame_with_keepalive(&mut rh, &fk, self, &ak) => {
2104 match outcome {
2105 FrameOutcome::Frame(mut raw) => {
2106 let msg = match EncryptedSession::decrypt_frame(&ak, sid, &mut raw) {
2107 Ok(m) => m,
2108 Err(e) => {
2109 tracing::warn!("[layer] Decrypt error: {e:?}: failing pending waiters and reconnecting");
2115 drop(init_rx.take());
2116 {
2117 let mut pending = self.inner.pending.lock().await;
2118 let msg = format!("decrypt error: {e}");
2119 for (_, tx) in pending.drain() {
2120 let _ = tx.send(Err(InvocationError::Io(
2121 std::io::Error::new(
2122 std::io::ErrorKind::InvalidData,
2123 msg.clone(),
2124 )
2125 )));
2126 }
2127 }
2128 self.inner.writer.lock().await.sent_bodies.clear();
2129 match self.do_reconnect_loop(
2130 RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
2131 network_hint_rx,
2132 ).await {
2133 Some(rx) => { init_rx = Some(rx); }
2134 None => return,
2135 }
2136 continue;
2137 }
2138 };
2139 self.route_frame(msg.body, msg.msg_id).await;
2144
2145 }
2150
2151 FrameOutcome::Error(e) => {
2152 tracing::warn!("[layer] Reader: connection error: {e}");
2153 drop(init_rx.take()); let key_is_stale = match &e {
2164 InvocationError::Rpc(r) if r.code == -404 => true,
2165 _ => false,
2167 };
2168 let clear_key = key_is_stale
2172 && self.inner.dh_in_progress
2173 .compare_exchange(false, true,
2174 std::sync::atomic::Ordering::SeqCst,
2175 std::sync::atomic::Ordering::SeqCst)
2176 .is_ok();
2177 if clear_key {
2178 let home_dc_id = *self.inner.home_dc_id.lock().await;
2179 let mut opts = self.inner.dc_options.lock().await;
2180 if let Some(entry) = opts.get_mut(&home_dc_id) {
2181 tracing::warn!(
2182 "[layer] Stale auth key on DC{home_dc_id} ({e}) \
2183 : clearing for fresh DH"
2184 );
2185 entry.auth_key = None;
2186 }
2187 }
2188
2189 {
2192 let mut pending = self.inner.pending.lock().await;
2193 let msg = e.to_string();
2194 for (_, tx) in pending.drain() {
2195 let _ = tx.send(Err(InvocationError::Io(
2196 std::io::Error::new(
2197 std::io::ErrorKind::ConnectionReset, msg.clone()))));
2198 }
2199 }
2200 self.inner.writer.lock().await.sent_bodies.clear();
2202
2203 let reconnect_delay = if clear_key { 0 } else { RECONNECT_BASE_MS };
2206 match self.do_reconnect_loop(
2207 reconnect_delay, &mut rh, &mut fk, &mut ak, &mut sid,
2208 network_hint_rx,
2209 ).await {
2210 Some(rx) => {
2211 self.inner.dh_in_progress
2214 .store(false, std::sync::atomic::Ordering::SeqCst);
2215 init_rx = Some(rx);
2216 }
2217 None => {
2218 self.inner.dh_in_progress
2219 .store(false, std::sync::atomic::Ordering::SeqCst);
2220 return; }
2222 }
2223 }
2224
2225 FrameOutcome::Keepalive => {
2226 let c = self.clone();
2230 tokio::spawn(async move {
2231 if let Err(e) = c.check_update_deadline().await {
2232 tracing::warn!("[layer] check_update_deadline: {e}");
2233 }
2234 });
2235 }
2236 }
2237 }
2238
2239 maybe = new_conn_rx.recv() => {
2241 if let Some((new_rh, new_fk, new_ak, new_sid)) = maybe {
2242 rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
2243 tracing::debug!("[layer] Reader: switched to new connection.");
2244 } else {
2245 break; }
2247 }
2248
2249
2250 init_result = async { init_rx.as_mut().unwrap().await }, if init_rx.is_some() => {
2252 init_rx = None;
2253 match init_result {
2254 Ok(Ok(())) => {
2255 init_fail_count = 0;
2256 tracing::info!("[layer] Reconnected to Telegram ✓: session live, replaying missed updates …");
2265 }
2266
2267 Ok(Err(e)) => {
2268 let key_is_stale = match &e {
2273 InvocationError::Rpc(r) if r.code == -404 => true,
2274 _ => false,
2275 };
2276 let dh_claimed = key_is_stale
2278 && self.inner.dh_in_progress
2279 .compare_exchange(false, true,
2280 std::sync::atomic::Ordering::SeqCst,
2281 std::sync::atomic::Ordering::SeqCst)
2282 .is_ok();
2283
2284 if dh_claimed {
2285 tracing::warn!(
2286 "[layer] init_connection: definitive bad-key ({e}) \
2287 : clearing auth key for fresh DH …"
2288 );
2289 init_fail_count = 0;
2290 let home_dc_id = *self.inner.home_dc_id.lock().await;
2291 let mut opts = self.inner.dc_options.lock().await;
2292 if let Some(entry) = opts.get_mut(&home_dc_id) {
2293 entry.auth_key = None;
2294 }
2295 } else {
2297 init_fail_count += 1;
2298 tracing::warn!(
2299 "[layer] init_connection failed (attempt {init_fail_count}, {e}) \
2300 : retrying with same key …"
2301 );
2302 }
2303 {
2304 let mut pending = self.inner.pending.lock().await;
2305 let msg = e.to_string();
2306 for (_, tx) in pending.drain() {
2307 let _ = tx.send(Err(InvocationError::Io(
2308 std::io::Error::new(
2309 std::io::ErrorKind::ConnectionReset, msg.clone()))));
2310 }
2311 }
2312 match self.do_reconnect_loop(
2313 0, &mut rh, &mut fk, &mut ak, &mut sid, network_hint_rx,
2314 ).await {
2315 Some(rx) => { init_rx = Some(rx); }
2316 None => return,
2317 }
2318 }
2319
2320 Err(_) => {
2321 tracing::warn!("[layer] init_connection task dropped unexpectedly, reconnecting …");
2323 match self.do_reconnect_loop(
2324 RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
2325 network_hint_rx,
2326 ).await {
2327 Some(rx) => { init_rx = Some(rx); }
2328 None => return,
2329 }
2330 }
2331 }
2332 }
2333 }
2334 }
2335 }
2336
2337 async fn route_frame(&self, body: Vec<u8>, msg_id: i64) {
2339 if body.len() < 4 {
2340 return;
2341 }
2342 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
2343
2344 match cid {
2345 ID_RPC_RESULT => {
2346 if body.len() < 12 {
2347 return;
2348 }
2349 let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2350 let inner = body[12..].to_vec();
2351 self.inner.writer.lock().await.pending_ack.push(msg_id);
2353 let result = unwrap_envelope(inner);
2354 if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
2355 self.inner
2357 .writer
2358 .lock()
2359 .await
2360 .sent_bodies
2361 .remove(&req_msg_id);
2362 self.inner
2364 .writer
2365 .lock()
2366 .await
2367 .container_map
2368 .retain(|_, inner| *inner != req_msg_id);
2369 let to_send = match result {
2370 Ok(EnvelopeResult::Payload(p)) => Ok(p),
2371 Ok(EnvelopeResult::RawUpdates(bodies)) => {
2372 let c = self.clone();
2377 tokio::spawn(async move {
2378 for body in bodies {
2379 c.dispatch_updates(&body).await;
2380 }
2381 });
2382 Ok(vec![])
2383 }
2384 Ok(EnvelopeResult::Pts(pts, pts_count)) => {
2385 let c = self.clone();
2387 tokio::spawn(async move {
2388 match c.check_and_fill_gap(pts, pts_count, None).await {
2389 Ok(replayed) => {
2390 for u in replayed {
2392 let _ = c.inner.update_tx.try_send(u);
2393 }
2394 }
2395 Err(e) => tracing::warn!(
2396 "[layer] updateShortSentMessage pts advance: {e}"
2397 ),
2398 }
2399 });
2400 Ok(vec![])
2401 }
2402 Ok(EnvelopeResult::None) => Ok(vec![]),
2403 Err(e) => {
2404 tracing::debug!(
2405 "[layer] rpc_result deserialize failure for msg_id={req_msg_id}: {e}"
2406 );
2407 Err(e)
2408 }
2409 };
2410 let _ = tx.send(to_send);
2411 }
2412 }
2413 ID_RPC_ERROR => {
2414 tracing::warn!("[layer] Unexpected top-level rpc_error (no pending target)");
2415 }
2416 ID_MSG_CONTAINER => {
2417 if body.len() < 8 {
2418 return;
2419 }
2420 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
2421 let mut pos = 8usize;
2422 for _ in 0..count {
2423 if pos + 16 > body.len() {
2424 break;
2425 }
2426 let inner_msg_id = i64::from_le_bytes(body[pos..pos + 8].try_into().unwrap());
2428 let inner_len =
2429 u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
2430 pos += 16;
2431 if pos + inner_len > body.len() {
2432 break;
2433 }
2434 let inner = body[pos..pos + inner_len].to_vec();
2435 pos += inner_len;
2436 Box::pin(self.route_frame(inner, inner_msg_id)).await;
2437 }
2438 }
2439 ID_GZIP_PACKED => {
2440 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
2441 if let Ok(inflated) = gz_inflate(&bytes) {
2442 Box::pin(self.route_frame(inflated, msg_id)).await;
2444 }
2445 }
2446 ID_BAD_SERVER_SALT => {
2447 if body.len() >= 28 {
2454 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2455 let new_salt = i64::from_le_bytes(body[20..28].try_into().unwrap());
2456
2457 {
2460 let mut w = self.inner.writer.lock().await;
2461 w.salts.clear();
2462 w.salts.push(FutureSalt {
2463 valid_since: 0,
2464 valid_until: i32::MAX,
2465 salt: new_salt,
2466 });
2467 w.enc.salt = new_salt;
2468 }
2469 tracing::debug!(
2470 "[layer] bad_server_salt: bad_msg_id={bad_msg_id} new_salt={new_salt:#x}"
2471 );
2472
2473 {
2478 let mut w = self.inner.writer.lock().await;
2479
2480 let resolved_id = if w.sent_bodies.contains_key(&bad_msg_id) {
2482 bad_msg_id
2483 } else if let Some(&inner_id) = w.container_map.get(&bad_msg_id) {
2484 w.container_map.remove(&bad_msg_id);
2485 inner_id
2486 } else {
2487 bad_msg_id };
2489
2490 if let Some(orig_body) = w.sent_bodies.remove(&resolved_id) {
2491 let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
2492 let fk = w.frame_kind.clone();
2493 drop(w);
2496 let mut pending = self.inner.pending.lock().await;
2497 if let Some(tx) = pending.remove(&resolved_id) {
2498 pending.insert(new_msg_id, tx);
2499 drop(pending);
2500 if let Err(e) = send_frame_write(
2501 &mut *self.inner.write_half.lock().await,
2502 &wire,
2503 &fk,
2504 )
2505 .await
2506 {
2507 tracing::warn!("[layer] bad_server_salt re-send failed: {e}");
2508 } else {
2509 tracing::debug!(
2510 "[layer] bad_server_salt re-sent \
2511 {resolved_id}→{new_msg_id}"
2512 );
2513 }
2514 }
2515 } else {
2516 drop(w);
2519 if let Some(tx) = self.inner.pending.lock().await.remove(&bad_msg_id) {
2520 let _ = tx.send(Err(InvocationError::Io(std::io::Error::new(
2521 std::io::ErrorKind::InvalidData,
2522 "bad_server_salt on re-sent message; caller should retry",
2523 ))));
2524 }
2525 }
2526 }
2527
2528 self.spawn_salt_fetch_if_needed();
2530 }
2531 }
2532 ID_PONG => {
2533 if body.len() >= 20 {
2539 let ping_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2540 self.inner.writer.lock().await.pending_ack.push(msg_id);
2542 if let Some(tx) = self.inner.pending.lock().await.remove(&ping_msg_id) {
2543 let mut w = self.inner.writer.lock().await;
2544 w.sent_bodies.remove(&ping_msg_id);
2545 w.container_map.retain(|_, inner| *inner != ping_msg_id);
2546 drop(w);
2547 let _ = tx.send(Ok(body));
2548 }
2549 }
2550 }
2551 ID_FUTURE_SALTS => {
2553 self.inner.writer.lock().await.pending_ack.push(msg_id);
2567
2568 if body.len() >= 24 {
2569 let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2570 let server_now = i32::from_le_bytes(body[12..16].try_into().unwrap());
2571 let count = u32::from_le_bytes(body[20..24].try_into().unwrap()) as usize;
2572
2573 let mut new_salts: Vec<FutureSalt> =
2576 Vec::with_capacity(count.clamp(0, 4096) as usize);
2577 for i in 0..count {
2578 let base = 24 + i * 16;
2579 if base + 16 > body.len() {
2580 break;
2581 }
2582 new_salts.push(FutureSalt {
2588 valid_since: i32::from_le_bytes(
2589 body[base..base + 4].try_into().unwrap(),
2590 ),
2591 salt: i64::from_le_bytes(body[base + 4..base + 12].try_into().unwrap()),
2592 valid_until: i32::from_le_bytes(
2593 body[base + 12..base + 16].try_into().unwrap(),
2594 ),
2595 });
2596 }
2597
2598 if !new_salts.is_empty() {
2599 new_salts.sort_by_key(|s| s.valid_since);
2602 let mut w = self.inner.writer.lock().await;
2603 w.salts = new_salts;
2604 w.start_salt_time = Some((server_now, std::time::Instant::now()));
2605
2606 let use_salt = w
2611 .salts
2612 .iter()
2613 .rev()
2614 .find(|s| s.valid_since + SALT_USE_DELAY <= server_now)
2615 .or_else(|| w.salts.first())
2616 .map(|s| s.salt);
2617 if let Some(salt) = use_salt {
2618 w.enc.salt = salt;
2619 tracing::debug!(
2620 "[layer] FutureSalts: stored {} salts, \
2621 active salt={salt:#x}",
2622 w.salts.len()
2623 );
2624 }
2625 }
2626
2627 if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
2628 let mut w = self.inner.writer.lock().await;
2629 w.sent_bodies.remove(&req_msg_id);
2630 w.container_map.retain(|_, inner| *inner != req_msg_id);
2631 drop(w);
2632 let _ = tx.send(Ok(body));
2633 }
2634 }
2635 }
2636 ID_NEW_SESSION => {
2637 if body.len() >= 28 {
2642 let server_salt = i64::from_le_bytes(body[20..28].try_into().unwrap());
2643 let mut w = self.inner.writer.lock().await;
2644 w.pending_ack.push(msg_id);
2646 w.salts.clear();
2649 w.salts.push(FutureSalt {
2650 valid_since: 0,
2651 valid_until: i32::MAX,
2652 salt: server_salt,
2653 });
2654 w.enc.salt = server_salt;
2655 tracing::debug!(
2656 "[layer] new_session_created: salt pool reset to {server_salt:#x}"
2657 );
2658 }
2659 }
2660 ID_BAD_MSG_NOTIFY => {
2662 if body.len() < 20 {
2664 return;
2665 }
2666 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2667 let error_code = u32::from_le_bytes(body[16..20].try_into().unwrap());
2668
2669 let description = match error_code {
2671 16 => "msg_id too low",
2672 17 => "msg_id too high",
2673 18 => "incorrect two lower order msg_id bits (bug)",
2674 19 => "container msg_id is same as previously received (bug)",
2675 20 => "message too old",
2676 32 => "msg_seqno too low",
2677 33 => "msg_seqno too high",
2678 34 => "even msg_seqno expected (bug)",
2679 35 => "odd msg_seqno expected (bug)",
2680 48 => "incorrect server salt",
2681 64 => "invalid container (bug)",
2682 _ => "unknown bad_msg code",
2683 };
2684
2685 let retryable = matches!(error_code, 16 | 17 | 48);
2687 let fatal = !retryable && !matches!(error_code, 32 | 33);
2688
2689 if fatal {
2690 tracing::error!(
2691 "[layer] bad_msg_notification (fatal): bad_msg_id={bad_msg_id} \
2692 code={error_code}: {description}"
2693 );
2694 } else {
2695 tracing::warn!(
2696 "[layer] bad_msg_notification: bad_msg_id={bad_msg_id} \
2697 code={error_code}: {description}"
2698 );
2699 }
2700
2701 let resend: Option<(Vec<u8>, i64, i64, FrameKind)> = {
2705 let mut w = self.inner.writer.lock().await;
2706
2707 if error_code == 16 || error_code == 17 {
2709 w.enc.correct_time_offset(msg_id);
2710 }
2711 if error_code == 32 || error_code == 33 {
2713 w.enc.correct_seq_no(error_code);
2714 }
2715
2716 if retryable {
2717 let resolved_id = if w.sent_bodies.contains_key(&bad_msg_id) {
2721 bad_msg_id
2722 } else if let Some(&inner_id) = w.container_map.get(&bad_msg_id) {
2723 w.container_map.remove(&bad_msg_id);
2724 inner_id
2725 } else {
2726 bad_msg_id
2727 };
2728
2729 if let Some(orig_body) = w.sent_bodies.remove(&resolved_id) {
2730 let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
2731 let fk = w.frame_kind.clone();
2732 w.sent_bodies.insert(new_msg_id, orig_body);
2733 Some((wire, resolved_id, new_msg_id, fk))
2735 } else {
2736 None
2737 }
2738 } else {
2739 w.sent_bodies.remove(&bad_msg_id);
2741 if let Some(&inner_id) = w.container_map.get(&bad_msg_id) {
2742 w.sent_bodies.remove(&inner_id);
2743 w.container_map.remove(&bad_msg_id);
2744 }
2745 None
2746 }
2747 }; match resend {
2750 Some((wire, old_msg_id, new_msg_id, fk)) => {
2751 let has_waiter = {
2753 let mut pending = self.inner.pending.lock().await;
2754 if let Some(tx) = pending.remove(&old_msg_id) {
2755 pending.insert(new_msg_id, tx);
2756 true
2757 } else {
2758 false
2759 }
2760 };
2761 if has_waiter {
2762 if let Err(e) = send_frame_write(
2764 &mut *self.inner.write_half.lock().await,
2765 &wire,
2766 &fk,
2767 )
2768 .await
2769 {
2770 tracing::warn!("[layer] re-send failed: {e}");
2771 self.inner
2772 .writer
2773 .lock()
2774 .await
2775 .sent_bodies
2776 .remove(&new_msg_id);
2777 } else {
2778 tracing::debug!("[layer] re-sent {old_msg_id}→{new_msg_id}");
2779 }
2780 } else {
2781 self.inner
2782 .writer
2783 .lock()
2784 .await
2785 .sent_bodies
2786 .remove(&new_msg_id);
2787 }
2788 }
2789 None => {
2790 if let Some(tx) = self.inner.pending.lock().await.remove(&bad_msg_id) {
2792 let _ = tx.send(Err(InvocationError::Deserialize(format!(
2793 "bad_msg_notification code={error_code} ({description})"
2794 ))));
2795 }
2796 }
2797 }
2798 }
2799 ID_MSG_DETAILED_INFO => {
2801 if body.len() >= 20 {
2805 let answer_msg_id = i64::from_le_bytes(body[12..20].try_into().unwrap());
2806 self.inner
2807 .writer
2808 .lock()
2809 .await
2810 .pending_ack
2811 .push(answer_msg_id);
2812 tracing::trace!(
2813 "[layer] MsgDetailedInfo: queued ack for answer_msg_id={answer_msg_id}"
2814 );
2815 }
2816 }
2817 ID_MSG_NEW_DETAIL_INFO => {
2818 if body.len() >= 12 {
2821 let answer_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2822 self.inner
2823 .writer
2824 .lock()
2825 .await
2826 .pending_ack
2827 .push(answer_msg_id);
2828 tracing::trace!("[layer] MsgNewDetailedInfo: queued ack for {answer_msg_id}");
2829 }
2830 }
2831 ID_MSG_RESEND_REQ => {
2833 if body.len() >= 12 {
2838 let count = u32::from_le_bytes(body[8..12].try_into().unwrap()) as usize;
2839 let mut resends: Vec<(Vec<u8>, i64, i64)> = Vec::new();
2840 {
2841 let mut w = self.inner.writer.lock().await;
2842 let fk = w.frame_kind.clone();
2843 for i in 0..count {
2844 let off = 12 + i * 8;
2845 if off + 8 > body.len() {
2846 break;
2847 }
2848 let resend_id =
2849 i64::from_le_bytes(body[off..off + 8].try_into().unwrap());
2850 if let Some(orig_body) = w.sent_bodies.remove(&resend_id) {
2851 let (wire, new_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
2852 let mut pending = self.inner.pending.lock().await;
2853 if let Some(tx) = pending.remove(&resend_id) {
2854 pending.insert(new_id, tx);
2855 }
2856 drop(pending);
2857 w.sent_bodies.insert(new_id, orig_body);
2858 resends.push((wire, resend_id, new_id));
2859 }
2860 }
2861 let _ = fk; }
2863 let fk = self.inner.writer.lock().await.frame_kind.clone();
2865 for (wire, resend_id, new_id) in resends {
2866 send_frame_write(&mut *self.inner.write_half.lock().await, &wire, &fk)
2867 .await
2868 .ok();
2869 tracing::debug!("[layer] MsgResendReq: resent {resend_id} → {new_id}");
2870 }
2871 }
2872 }
2873 0xe22045fc => {
2875 tracing::warn!("[layer] destroy_session_ok received: session terminated by server");
2876 }
2877 0x62d350c9 => {
2878 tracing::warn!("[layer] destroy_session_none received: session was already gone");
2879 }
2880 ID_UPDATES
2881 | ID_UPDATE_SHORT
2882 | ID_UPDATES_COMBINED
2883 | ID_UPDATE_SHORT_MSG
2884 | ID_UPDATE_SHORT_CHAT_MSG
2885 | ID_UPDATE_SHORT_SENT_MSG
2886 | ID_UPDATES_TOO_LONG => {
2887 self.inner.writer.lock().await.pending_ack.push(msg_id);
2889 self.dispatch_updates(&body).await;
2891 }
2892 _ => {}
2893 }
2894 }
2895
2896 fn update_sort_key(upd: &tl::enums::Update) -> i32 {
2906 use tl::enums::Update::*;
2907 match upd {
2908 NewMessage(u) => u.pts - u.pts_count,
2909 EditMessage(u) => u.pts - u.pts_count,
2910 DeleteMessages(u) => u.pts - u.pts_count,
2911 ReadHistoryInbox(u) => u.pts - u.pts_count,
2912 ReadHistoryOutbox(u) => u.pts - u.pts_count,
2913 NewChannelMessage(u) => u.pts - u.pts_count,
2914 EditChannelMessage(u) => u.pts - u.pts_count,
2915 DeleteChannelMessages(u) => u.pts - u.pts_count,
2916 _ => 0,
2917 }
2918 }
2919
2920 async fn dispatch_updates(&self, body: &[u8]) {
2925 if body.len() < 4 {
2926 return;
2927 }
2928 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
2929
2930 if cid == 0xe317af7e_u32 {
2932 tracing::warn!("[layer] updatesTooLong: getDifference");
2933 let c = self.clone();
2934 let utx = self.inner.update_tx.clone();
2935 tokio::spawn(async move {
2936 match c.get_difference().await {
2937 Ok(updates) => {
2938 for u in updates {
2939 if utx.try_send(u).is_err() {
2940 tracing::warn!("[layer] update channel full: dropping update");
2941 break;
2942 }
2943 }
2944 }
2945 Err(e) => tracing::warn!("[layer] getDifference after updatesTooLong: {e}"),
2946 }
2947 });
2948 return;
2949 }
2950
2951 if cid == 0x313bc7f8 {
2960 let mut cur = Cursor::from_slice(&body[4..]);
2962 let m = match tl::types::UpdateShortMessage::deserialize(&mut cur) {
2963 Ok(m) => m,
2964 Err(e) => {
2965 tracing::debug!("[layer] updateShortMessage deserialize error: {e}");
2966 return;
2967 }
2968 };
2969 let pts = m.pts;
2970 let pts_count = m.pts_count;
2971 let upd = update::Update::NewMessage(update::make_short_dm(m));
2972 let c = self.clone();
2973 let utx = self.inner.update_tx.clone();
2974 tokio::spawn(async move {
2975 match c
2976 .check_and_fill_gap(pts, pts_count, Some(attach_client_to_update(upd, &c)))
2977 .await
2978 {
2979 Ok(updates) => {
2980 for u in updates {
2981 if utx.try_send(u).is_err() {
2982 tracing::warn!("[layer] update channel full: dropping update");
2983 }
2984 }
2985 }
2986 Err(e) => tracing::warn!("[layer] updateShortMessage gap fill: {e}"),
2987 }
2988 });
2989 return;
2990 }
2991 if cid == 0x4d6deea5 {
2992 let mut cur = Cursor::from_slice(&body[4..]);
2994 let m = match tl::types::UpdateShortChatMessage::deserialize(&mut cur) {
2995 Ok(m) => m,
2996 Err(e) => {
2997 tracing::debug!("[layer] updateShortChatMessage deserialize error: {e}");
2998 return;
2999 }
3000 };
3001 let pts = m.pts;
3002 let pts_count = m.pts_count;
3003 let upd = update::Update::NewMessage(update::make_short_chat(m));
3004 let c = self.clone();
3005 let utx = self.inner.update_tx.clone();
3006 tokio::spawn(async move {
3007 match c
3008 .check_and_fill_gap(pts, pts_count, Some(attach_client_to_update(upd, &c)))
3009 .await
3010 {
3011 Ok(updates) => {
3012 for u in updates {
3013 if utx.try_send(u).is_err() {
3014 tracing::warn!("[layer] update channel full: dropping update");
3015 }
3016 }
3017 }
3018 Err(e) => tracing::warn!("[layer] updateShortChatMessage gap fill: {e}"),
3019 }
3020 });
3021 return;
3022 }
3023
3024 if cid == ID_UPDATE_SHORT_SENT_MSG {
3028 let mut cur = Cursor::from_slice(&body[4..]);
3029 match tl::types::UpdateShortSentMessage::deserialize(&mut cur) {
3030 Ok(m) => {
3031 let pts = m.pts;
3032 let pts_count = m.pts_count;
3033 tracing::debug!(
3034 "[layer] updateShortSentMessage (push): pts={pts} pts_count={pts_count}: advancing pts"
3035 );
3036 let c = self.clone();
3037 let utx = self.inner.update_tx.clone();
3038 tokio::spawn(async move {
3039 match c.check_and_fill_gap(pts, pts_count, None).await {
3040 Ok(replayed) => {
3041 for u in replayed {
3042 if utx.try_send(u).is_err() {
3043 tracing::warn!(
3044 "[layer] update channel full: dropping update"
3045 );
3046 }
3047 }
3048 }
3049 Err(e) => tracing::warn!(
3050 "[layer] updateShortSentMessage push pts advance: {e}"
3051 ),
3052 }
3053 });
3054 }
3055 Err(e) => {
3056 tracing::debug!("[layer] updateShortSentMessage push deserialize error: {e}")
3057 }
3058 }
3059 return;
3060 }
3061
3062 use crate::pts::PtsCheckResult;
3069 use layer_tl_types::{Cursor, Deserializable};
3070
3071 struct ParsedContainer {
3077 seq_info: Option<(i32, i32)>,
3078 users: Vec<tl::enums::User>,
3079 chats: Vec<tl::enums::Chat>,
3080 updates: Vec<tl::enums::Update>,
3081 }
3082
3083 let mut cur = Cursor::from_slice(body);
3084 let parsed: ParsedContainer = match cid {
3085 0x74ae4240 => {
3086 match tl::enums::Updates::deserialize(&mut cur) {
3088 Ok(tl::enums::Updates::Updates(u)) => ParsedContainer {
3089 seq_info: Some((u.seq, u.seq)),
3090 users: u.users,
3091 chats: u.chats,
3092 updates: u.updates,
3093 },
3094 _ => ParsedContainer {
3095 seq_info: None,
3096 users: vec![],
3097 chats: vec![],
3098 updates: vec![],
3099 },
3100 }
3101 }
3102 0x725b04c3 => {
3103 match tl::enums::Updates::deserialize(&mut cur) {
3105 Ok(tl::enums::Updates::Combined(u)) => ParsedContainer {
3106 seq_info: Some((u.seq, u.seq_start)),
3107 users: u.users,
3108 chats: u.chats,
3109 updates: u.updates,
3110 },
3111 _ => ParsedContainer {
3112 seq_info: None,
3113 users: vec![],
3114 chats: vec![],
3115 updates: vec![],
3116 },
3117 }
3118 }
3119 0x78d4dec1 => {
3120 match tl::types::UpdateShort::deserialize(&mut Cursor::from_slice(body)) {
3122 Ok(u) => ParsedContainer {
3123 seq_info: None,
3124 users: vec![],
3125 chats: vec![],
3126 updates: vec![u.update],
3127 },
3128 Err(_) => ParsedContainer {
3129 seq_info: None,
3130 users: vec![],
3131 chats: vec![],
3132 updates: vec![],
3133 },
3134 }
3135 }
3136 _ => ParsedContainer {
3137 seq_info: None,
3138 users: vec![],
3139 chats: vec![],
3140 updates: vec![],
3141 },
3142 };
3143
3144 if !parsed.users.is_empty() || !parsed.chats.is_empty() {
3146 self.cache_users_and_chats(&parsed.users, &parsed.chats)
3147 .await;
3148 }
3149
3150 if let Some((seq, seq_start)) = parsed.seq_info
3152 && seq != 0
3153 {
3154 let result = self.inner.pts_state.lock().await.check_seq(seq, seq_start);
3155 match result {
3156 PtsCheckResult::Ok => {
3157 }
3159 PtsCheckResult::Duplicate => {
3160 tracing::debug!(
3162 "[layer] seq duplicate (seq={seq}, seq_start={seq_start}): dropping container"
3163 );
3164 return;
3165 }
3166 PtsCheckResult::Gap { expected, got } => {
3167 tracing::warn!(
3170 "[layer] seq gap: expected {expected}, got {got}: getDifference"
3171 );
3172 let c = self.clone();
3173 let utx = self.inner.update_tx.clone();
3174 tokio::spawn(async move {
3175 match c.get_difference().await {
3176 Ok(updates) => {
3177 for u in updates {
3178 if utx.try_send(u).is_err() {
3179 tracing::warn!(
3180 "[layer] update channel full: dropping seq gap update"
3181 );
3182 break;
3183 }
3184 }
3185 }
3186 Err(e) => tracing::warn!("[layer] seq gap fill: {e}"),
3187 }
3188 });
3189 return; }
3191 }
3192 }
3193
3194 let mut raw: Vec<tl::enums::Update> = parsed.updates;
3195
3196 raw.sort_by_key(Self::update_sort_key);
3201
3202 for upd in raw {
3203 self.dispatch_single_update(upd).await;
3204 }
3205
3206 if let Some((seq, _)) = parsed.seq_info
3212 && seq != 0
3213 {
3214 self.inner.pts_state.lock().await.advance_seq(seq);
3215 }
3216 }
3217
3218 async fn dispatch_single_update(&self, upd: tl::enums::Update) {
3221 enum Kind {
3224 GlobalPts {
3225 pts: i32,
3226 pts_count: i32,
3227 carry: bool,
3228 },
3229 ChannelPts {
3230 channel_id: i64,
3231 pts: i32,
3232 pts_count: i32,
3233 carry: bool,
3234 },
3235 Qts {
3236 qts: i32,
3237 },
3238 Passthrough,
3239 }
3240
3241 fn ch_from_msg(msg: &tl::enums::Message) -> i64 {
3242 if let tl::enums::Message::Message(m) = msg
3243 && let tl::enums::Peer::Channel(c) = &m.peer_id
3244 {
3245 return c.channel_id;
3246 }
3247 0
3248 }
3249
3250 let kind = {
3251 use tl::enums::Update::*;
3252 match &upd {
3253 NewMessage(u) => Kind::GlobalPts {
3254 pts: u.pts,
3255 pts_count: u.pts_count,
3256 carry: true,
3257 },
3258 EditMessage(u) => Kind::GlobalPts {
3259 pts: u.pts,
3260 pts_count: u.pts_count,
3261 carry: true,
3262 },
3263 DeleteMessages(u) => Kind::GlobalPts {
3264 pts: u.pts,
3265 pts_count: u.pts_count,
3266 carry: true,
3267 },
3268 ReadHistoryInbox(u) => Kind::GlobalPts {
3269 pts: u.pts,
3270 pts_count: u.pts_count,
3271 carry: false,
3272 },
3273 ReadHistoryOutbox(u) => Kind::GlobalPts {
3274 pts: u.pts,
3275 pts_count: u.pts_count,
3276 carry: false,
3277 },
3278 NewChannelMessage(u) => Kind::ChannelPts {
3279 channel_id: ch_from_msg(&u.message),
3280 pts: u.pts,
3281 pts_count: u.pts_count,
3282 carry: true,
3283 },
3284 EditChannelMessage(u) => Kind::ChannelPts {
3285 channel_id: ch_from_msg(&u.message),
3286 pts: u.pts,
3287 pts_count: u.pts_count,
3288 carry: true,
3289 },
3290 DeleteChannelMessages(u) => Kind::ChannelPts {
3291 channel_id: u.channel_id,
3292 pts: u.pts,
3293 pts_count: u.pts_count,
3294 carry: true,
3295 },
3296 NewEncryptedMessage(u) => Kind::Qts { qts: u.qts },
3297 _ => Kind::Passthrough,
3298 }
3299 };
3300
3301 let high = update::from_single_update_pub(upd);
3302
3303 let to_send: Vec<update::Update> = match kind {
3304 Kind::GlobalPts {
3305 pts,
3306 pts_count,
3307 carry,
3308 } => {
3309 let first = if carry { high.into_iter().next() } else { None };
3310 let c = self.clone();
3314 let utx = self.inner.update_tx.clone();
3315 tokio::spawn(async move {
3316 match c.check_and_fill_gap(pts, pts_count, first).await {
3317 Ok(v) => {
3318 for u in v {
3319 let u = attach_client_to_update(u, &c);
3320 if utx.try_send(u).is_err() {
3321 tracing::warn!("[layer] update channel full: dropping update");
3322 break;
3323 }
3324 }
3325 }
3326 Err(e) => tracing::warn!("[layer] pts gap: {e}"),
3327 }
3328 });
3329 vec![]
3330 }
3331 Kind::ChannelPts {
3332 channel_id,
3333 pts,
3334 pts_count,
3335 carry,
3336 } => {
3337 let first = if carry { high.into_iter().next() } else { None };
3338 if channel_id != 0 {
3339 let c = self.clone();
3341 let utx = self.inner.update_tx.clone();
3342 tokio::spawn(async move {
3343 match c
3344 .check_and_fill_channel_gap(channel_id, pts, pts_count, first)
3345 .await
3346 {
3347 Ok(v) => {
3348 for u in v {
3349 let u = attach_client_to_update(u, &c);
3350 if utx.try_send(u).is_err() {
3351 tracing::warn!(
3352 "[layer] update channel full: dropping update"
3353 );
3354 break;
3355 }
3356 }
3357 }
3358 Err(e) => tracing::warn!("[layer] ch pts gap: {e}"),
3359 }
3360 });
3361 vec![]
3362 } else {
3363 first.into_iter().collect()
3364 }
3365 }
3366 Kind::Qts { qts } => {
3367 let c = self.clone();
3369 tokio::spawn(async move {
3370 if let Err(e) = c.check_and_fill_qts_gap(qts, 1).await {
3371 tracing::warn!("[layer] qts gap: {e}");
3372 }
3373 });
3374 vec![]
3375 }
3376 Kind::Passthrough => high
3377 .into_iter()
3378 .map(|u| match u {
3379 update::Update::NewMessage(msg) => {
3380 update::Update::NewMessage(msg.with_client(self.clone()))
3381 }
3382 update::Update::MessageEdited(msg) => {
3383 update::Update::MessageEdited(msg.with_client(self.clone()))
3384 }
3385 other => other,
3386 })
3387 .collect(),
3388 };
3389
3390 for u in to_send {
3391 if self.inner.update_tx.try_send(u).is_err() {
3392 tracing::warn!("[layer] update channel full: dropping update");
3393 }
3394 }
3395 }
3396
3397 async fn do_reconnect_loop(
3407 &self,
3408 initial_delay_ms: u64,
3409 rh: &mut OwnedReadHalf,
3410 fk: &mut FrameKind,
3411 ak: &mut [u8; 256],
3412 sid: &mut i64,
3413 network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
3414 ) -> Option<oneshot::Receiver<Result<(), InvocationError>>> {
3415 let mut delay_ms = if initial_delay_ms == 0 {
3416 0
3419 } else {
3420 initial_delay_ms.max(RECONNECT_BASE_MS)
3421 };
3422 loop {
3423 tracing::debug!("[layer] Reconnecting in {delay_ms} ms …");
3424 tokio::select! {
3425 _ = sleep(Duration::from_millis(delay_ms)) => {}
3426 hint = network_hint_rx.recv() => {
3427 hint?; tracing::debug!("[layer] Network hint → skipping backoff, reconnecting now");
3429 }
3430 }
3431
3432 match self.do_reconnect(ak, fk).await {
3433 Ok((new_rh, new_fk, new_ak, new_sid)) => {
3434 *rh = new_rh;
3435 *fk = new_fk;
3436 *ak = new_ak;
3437 *sid = new_sid;
3438 tracing::debug!("[layer] TCP reconnected ✓: initialising session …");
3439
3440 let (init_tx, init_rx) = oneshot::channel();
3444 let c = self.clone();
3445 let utx = self.inner.update_tx.clone();
3446 tokio::spawn(async move {
3447 let result = {
3452 let mut attempt = 0u32;
3453 const MAX_ATTEMPTS: u32 = 5;
3454 loop {
3455 match c.init_connection().await {
3456 Ok(()) => break Ok(()),
3457 Err(InvocationError::Rpc(ref r))
3458 if r.flood_wait_seconds().is_some() =>
3459 {
3460 let secs = r.flood_wait_seconds().unwrap();
3461 tracing::warn!(
3462 "[layer] init_connection FLOOD_WAIT_{secs}: \
3463 waiting before retry"
3464 );
3465 sleep(Duration::from_secs(secs + 1)).await;
3466 }
3467 Err(InvocationError::Rpc(ref r))
3468 if r.code == 401 && attempt < MAX_ATTEMPTS =>
3469 {
3470 let delay = Duration::from_millis(500 * (1u64 << attempt));
3471 tracing::warn!(
3472 "[layer] init_connection AUTH_KEY_UNREGISTERED \
3473 (attempt {}/{MAX_ATTEMPTS}): retrying in {delay:?}",
3474 attempt + 1,
3475 );
3476 sleep(delay).await;
3477 attempt += 1;
3478 }
3479 Err(e) => break Err(e),
3480 }
3481 }
3482 };
3483 if result.is_ok() {
3484 let missed = {
3489 let mut attempt = 0u32;
3490 const MAX_ATTEMPTS: u32 = 5;
3491 loop {
3492 match c.get_difference().await {
3493 Ok(updates) => break updates,
3494 Err(ref e)
3495 if matches!(e,
3496 InvocationError::Rpc(r) if r.code == 401)
3497 && attempt < MAX_ATTEMPTS =>
3498 {
3499 let delay =
3500 Duration::from_millis(500 * (1u64 << attempt));
3501 tracing::warn!(
3502 "[layer] getDifference AUTH_KEY_UNREGISTERED \
3503 (attempt {}/{MAX_ATTEMPTS}): retrying in \
3504 {delay:?}",
3505 attempt + 1,
3506 );
3507 sleep(delay).await;
3508 attempt += 1;
3509 }
3510 Err(ref e)
3511 if matches!(e,
3512 InvocationError::Rpc(r) if r.code == 401) =>
3513 {
3514 tracing::warn!(
3515 "[layer] getDifference AUTH_KEY_UNREGISTERED \
3516 after {MAX_ATTEMPTS} retries: falling back \
3517 to sync_pts_state"
3518 );
3519 let _ = c.sync_pts_state().await;
3520 break vec![];
3521 }
3522 Err(e) => {
3523 tracing::warn!(
3524 "[layer] getDifference failed after reconnect: \
3525 {e}"
3526 );
3527 break vec![];
3528 }
3529 }
3530 }
3531 };
3532 for u in missed {
3533 if utx.try_send(attach_client_to_update(u, &c)).is_err() {
3534 tracing::warn!(
3535 "[layer] update channel full: dropping catch-up update"
3536 );
3537 break;
3538 }
3539 }
3540 }
3541 let _ = init_tx.send(result);
3542 });
3543 return Some(init_rx);
3544 }
3545 Err(e) => {
3546 tracing::warn!("[layer] Reconnect attempt failed: {e}");
3547 let next = delay_ms
3551 .saturating_mul(2)
3552 .clamp(RECONNECT_BASE_MS, RECONNECT_MAX_SECS * 1_000);
3553 delay_ms = jitter_delay(next).as_millis() as u64;
3554 }
3555 }
3556 }
3557 }
3558
3559 async fn do_reconnect(
3561 &self,
3562 _old_auth_key: &[u8; 256],
3563 _old_frame_kind: &FrameKind,
3564 ) -> Result<(OwnedReadHalf, FrameKind, [u8; 256], i64), InvocationError> {
3565 let home_dc_id = *self.inner.home_dc_id.lock().await;
3566 let (addr, saved_key, first_salt, time_offset) = {
3567 let opts = self.inner.dc_options.lock().await;
3568 match opts.get(&home_dc_id) {
3569 Some(e) => (e.addr.clone(), e.auth_key, e.first_salt, e.time_offset),
3570 None => (
3571 crate::dc_migration::fallback_dc_addr(home_dc_id).to_string(),
3572 None,
3573 0,
3574 0,
3575 ),
3576 }
3577 };
3578 let socks5 = self.inner.socks5.clone();
3579 let mtproxy = self.inner.mtproxy.clone();
3580 let transport = self.inner.transport.clone();
3581
3582 let new_conn = if let Some(key) = saved_key {
3583 tracing::debug!("[layer] Reconnecting to DC{home_dc_id} with saved key …");
3584 match Connection::connect_with_key(
3585 &addr,
3586 key,
3587 first_salt,
3588 time_offset,
3589 socks5.as_ref(),
3590 &transport,
3591 home_dc_id as i16,
3592 )
3593 .await
3594 {
3595 Ok(c) => c,
3596 Err(e) => {
3597 return Err(e);
3598 }
3599 }
3600 } else {
3601 Connection::connect_raw(
3602 &addr,
3603 socks5.as_ref(),
3604 mtproxy.as_ref(),
3605 &transport,
3606 home_dc_id as i16,
3607 )
3608 .await?
3609 };
3610
3611 let (new_writer, new_wh, new_read, new_fk) = new_conn.into_writer();
3612 let new_ak = new_writer.enc.auth_key_bytes();
3613 let new_sid = new_writer.enc.session_id();
3614 *self.inner.writer.lock().await = new_writer;
3615 *self.inner.write_half.lock().await = new_wh;
3616
3617 self.inner
3625 .salt_request_in_flight
3626 .store(false, std::sync::atomic::Ordering::SeqCst);
3627
3628 {
3633 let mut opts = self.inner.dc_options.lock().await;
3634 if let Some(entry) = opts.get_mut(&home_dc_id) {
3635 entry.auth_key = Some(new_ak);
3636 }
3637 }
3638
3639 Ok((new_read, new_fk, new_ak, new_sid))
3651 }
3652
3653 pub async fn send_message(
3657 &self,
3658 peer: &str,
3659 text: &str,
3660 ) -> Result<update::IncomingMessage, InvocationError> {
3661 let p = self.resolve_peer(peer).await?;
3662 self.send_message_to_peer(p, text).await
3663 }
3664
3665 pub async fn send_message_to_peer(
3670 &self,
3671 peer: impl Into<PeerRef>,
3672 text: &str,
3673 ) -> Result<update::IncomingMessage, InvocationError> {
3674 self.send_message_to_peer_ex(peer, &InputMessage::text(text))
3675 .await
3676 }
3677
3678 pub async fn send_message_to_peer_ex(
3683 &self,
3684 peer: impl Into<PeerRef>,
3685 msg: &InputMessage,
3686 ) -> Result<update::IncomingMessage, InvocationError> {
3687 let peer = peer.into().resolve(self).await?;
3688 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
3689 let schedule = if msg.schedule_once_online {
3690 Some(0x7FFF_FFFEi32)
3691 } else {
3692 msg.schedule_date
3693 };
3694
3695 if let Some(media) = &msg.media {
3697 let req = tl::functions::messages::SendMedia {
3698 silent: msg.silent,
3699 background: msg.background,
3700 clear_draft: msg.clear_draft,
3701 noforwards: false,
3702 update_stickersets_order: false,
3703 invert_media: msg.invert_media,
3704 allow_paid_floodskip: false,
3705 peer: input_peer,
3706 reply_to: msg.reply_header(),
3707 media: media.clone(),
3708 message: msg.text.clone(),
3709 random_id: random_i64(),
3710 reply_markup: msg.reply_markup.clone(),
3711 entities: msg.entities.clone(),
3712 schedule_date: schedule,
3713 schedule_repeat_period: None,
3714 send_as: None,
3715 quick_reply_shortcut: None,
3716 effect: None,
3717 allow_paid_stars: None,
3718 suggested_post: None,
3719 };
3720 let body = self.rpc_call_raw_pub(&req).await?;
3721 return Ok(self.extract_sent_message(&body, msg, &peer));
3722 }
3723
3724 let req = tl::functions::messages::SendMessage {
3725 no_webpage: msg.no_webpage,
3726 silent: msg.silent,
3727 background: msg.background,
3728 clear_draft: msg.clear_draft,
3729 noforwards: false,
3730 update_stickersets_order: false,
3731 invert_media: msg.invert_media,
3732 allow_paid_floodskip: false,
3733 peer: input_peer,
3734 reply_to: msg.reply_header(),
3735 message: msg.text.clone(),
3736 random_id: random_i64(),
3737 reply_markup: msg.reply_markup.clone(),
3738 entities: msg.entities.clone(),
3739 schedule_date: schedule,
3740 schedule_repeat_period: None,
3741 send_as: None,
3742 quick_reply_shortcut: None,
3743 effect: None,
3744 allow_paid_stars: None,
3745 suggested_post: None,
3746 };
3747 let body = self.rpc_call_raw(&req).await?;
3748 Ok(self.extract_sent_message(&body, msg, &peer))
3749 }
3750
3751 pub async fn send_message_with_input_peer(
3755 &self,
3756 input_peer: tl::enums::InputPeer,
3757 msg: &InputMessage,
3758 ) -> Result<(), InvocationError> {
3759 let schedule = if msg.schedule_once_online {
3760 Some(0x7FFF_FFFEi32)
3761 } else {
3762 msg.schedule_date
3763 };
3764
3765 if let Some(media) = &msg.media {
3766 let req = tl::functions::messages::SendMedia {
3767 silent: msg.silent,
3768 background: msg.background,
3769 clear_draft: msg.clear_draft,
3770 noforwards: false,
3771 update_stickersets_order: false,
3772 invert_media: msg.invert_media,
3773 allow_paid_floodskip: false,
3774 peer: input_peer,
3775 reply_to: msg.reply_header(),
3776 media: media.clone(),
3777 message: msg.text.clone(),
3778 random_id: random_i64(),
3779 reply_markup: msg.reply_markup.clone(),
3780 entities: msg.entities.clone(),
3781 schedule_date: schedule,
3782 schedule_repeat_period: None,
3783 send_as: None,
3784 quick_reply_shortcut: None,
3785 effect: None,
3786 allow_paid_stars: None,
3787 suggested_post: None,
3788 };
3789 self.rpc_call_raw_pub(&req).await?;
3790 return Ok(());
3791 }
3792
3793 let req = tl::functions::messages::SendMessage {
3794 no_webpage: msg.no_webpage,
3795 silent: msg.silent,
3796 background: msg.background,
3797 clear_draft: msg.clear_draft,
3798 noforwards: false,
3799 update_stickersets_order: false,
3800 invert_media: msg.invert_media,
3801 allow_paid_floodskip: false,
3802 peer: input_peer,
3803 reply_to: msg.reply_header(),
3804 message: msg.text.clone(),
3805 random_id: random_i64(),
3806 reply_markup: msg.reply_markup.clone(),
3807 entities: msg.entities.clone(),
3808 schedule_date: schedule,
3809 schedule_repeat_period: None,
3810 send_as: None,
3811 quick_reply_shortcut: None,
3812 effect: None,
3813 allow_paid_stars: None,
3814 suggested_post: None,
3815 };
3816 self.rpc_call_raw(&req).await?;
3817 Ok(())
3818 }
3819
3820 fn extract_sent_message(
3824 &self,
3825 body: &[u8],
3826 input: &InputMessage,
3827 peer: &tl::enums::Peer,
3828 ) -> update::IncomingMessage {
3829 if body.len() < 4 {
3830 return self.synthetic_sent(input, peer, 0, 0);
3831 }
3832 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
3833
3834 if cid == 0x74ae4240 || cid == 0x725b04c3 {
3836 let mut cur = Cursor::from_slice(body);
3837 if let Ok(tl::enums::Updates::Updates(u)) = tl::enums::Updates::deserialize(&mut cur) {
3838 for upd in &u.updates {
3839 if let tl::enums::Update::NewMessage(nm) = upd {
3840 return update::IncomingMessage::from_raw(nm.message.clone())
3841 .with_client(self.clone());
3842 }
3843 if let tl::enums::Update::NewChannelMessage(nm) = upd {
3844 return update::IncomingMessage::from_raw(nm.message.clone())
3845 .with_client(self.clone());
3846 }
3847 }
3848 }
3849 if let Ok(tl::enums::Updates::Combined(u)) =
3850 tl::enums::Updates::deserialize(&mut Cursor::from_slice(body))
3851 {
3852 for upd in &u.updates {
3853 if let tl::enums::Update::NewMessage(nm) = upd {
3854 return update::IncomingMessage::from_raw(nm.message.clone())
3855 .with_client(self.clone());
3856 }
3857 if let tl::enums::Update::NewChannelMessage(nm) = upd {
3858 return update::IncomingMessage::from_raw(nm.message.clone())
3859 .with_client(self.clone());
3860 }
3861 }
3862 }
3863 }
3864
3865 if cid == 0x9015e101 {
3868 let mut cur = Cursor::from_slice(&body[4..]);
3869 if let Ok(sent) = tl::types::UpdateShortSentMessage::deserialize(&mut cur) {
3870 return self.synthetic_sent_from_short(sent, input, peer);
3871 }
3872 }
3873
3874 if cid == 0x313bc7f8 {
3876 let mut cur = Cursor::from_slice(&body[4..]);
3877 if let Ok(m) = tl::types::UpdateShortMessage::deserialize(&mut cur) {
3878 let msg = tl::types::Message {
3879 out: m.out,
3880 mentioned: m.mentioned,
3881 media_unread: m.media_unread,
3882 silent: m.silent,
3883 post: false,
3884 from_scheduled: false,
3885 legacy: false,
3886 edit_hide: false,
3887 pinned: false,
3888 noforwards: false,
3889 invert_media: false,
3890 offline: false,
3891 video_processing_pending: false,
3892 paid_suggested_post_stars: false,
3893 paid_suggested_post_ton: false,
3894 id: m.id,
3895 from_id: Some(tl::enums::Peer::User(tl::types::PeerUser {
3896 user_id: m.user_id,
3897 })),
3898 peer_id: tl::enums::Peer::User(tl::types::PeerUser { user_id: m.user_id }),
3899 saved_peer_id: None,
3900 fwd_from: m.fwd_from,
3901 via_bot_id: m.via_bot_id,
3902 via_business_bot_id: None,
3903 reply_to: m.reply_to,
3904 date: m.date,
3905 message: m.message,
3906 media: None,
3907 reply_markup: None,
3908 entities: m.entities,
3909 views: None,
3910 forwards: None,
3911 replies: None,
3912 edit_date: None,
3913 post_author: None,
3914 grouped_id: None,
3915 reactions: None,
3916 restriction_reason: None,
3917 ttl_period: None,
3918 quick_reply_shortcut_id: None,
3919 effect: None,
3920 factcheck: None,
3921 report_delivery_until_date: None,
3922 paid_message_stars: None,
3923 suggested_post: None,
3924 from_rank: None,
3925 from_boosts_applied: None,
3926 schedule_repeat_period: None,
3927 summary_from_language: None,
3928 };
3929 return update::IncomingMessage::from_raw(tl::enums::Message::Message(msg))
3930 .with_client(self.clone());
3931 }
3932 }
3933
3934 self.synthetic_sent(input, peer, 0, 0)
3936 }
3937
3938 fn synthetic_sent_from_short(
3940 &self,
3941 sent: tl::types::UpdateShortSentMessage,
3942 input: &InputMessage,
3943 peer: &tl::enums::Peer,
3944 ) -> update::IncomingMessage {
3945 let msg = tl::types::Message {
3946 out: sent.out,
3947 mentioned: false,
3948 media_unread: false,
3949 silent: input.silent,
3950 post: false,
3951 from_scheduled: false,
3952 legacy: false,
3953 edit_hide: false,
3954 pinned: false,
3955 noforwards: false,
3956 invert_media: input.invert_media,
3957 offline: false,
3958 video_processing_pending: false,
3959 paid_suggested_post_stars: false,
3960 paid_suggested_post_ton: false,
3961 id: sent.id,
3962 from_id: None,
3963 from_boosts_applied: None,
3964 from_rank: None,
3965 peer_id: peer.clone(),
3966 saved_peer_id: None,
3967 fwd_from: None,
3968 via_bot_id: None,
3969 via_business_bot_id: None,
3970 reply_to: input.reply_to.map(|id| {
3971 tl::enums::MessageReplyHeader::MessageReplyHeader(tl::types::MessageReplyHeader {
3972 reply_to_scheduled: false,
3973 forum_topic: false,
3974 quote: false,
3975 reply_to_msg_id: Some(id),
3976 reply_to_peer_id: None,
3977 reply_from: None,
3978 reply_media: None,
3979 reply_to_top_id: None,
3980 quote_text: None,
3981 quote_entities: None,
3982 quote_offset: None,
3983 todo_item_id: None,
3984 poll_option: None,
3985 })
3986 }),
3987 date: sent.date,
3988 message: input.text.clone(),
3989 media: sent.media,
3990 reply_markup: input.reply_markup.clone(),
3991 entities: sent.entities,
3992 views: None,
3993 forwards: None,
3994 replies: None,
3995 edit_date: None,
3996 post_author: None,
3997 grouped_id: None,
3998 reactions: None,
3999 restriction_reason: None,
4000 ttl_period: sent.ttl_period,
4001 quick_reply_shortcut_id: None,
4002 effect: None,
4003 factcheck: None,
4004 report_delivery_until_date: None,
4005 paid_message_stars: None,
4006 suggested_post: None,
4007 schedule_repeat_period: None,
4008 summary_from_language: None,
4009 };
4010 update::IncomingMessage::from_raw(tl::enums::Message::Message(msg))
4011 .with_client(self.clone())
4012 }
4013
4014 fn synthetic_sent(
4016 &self,
4017 input: &InputMessage,
4018 peer: &tl::enums::Peer,
4019 id: i32,
4020 date: i32,
4021 ) -> update::IncomingMessage {
4022 let msg = tl::types::Message {
4023 out: true,
4024 mentioned: false,
4025 media_unread: false,
4026 silent: input.silent,
4027 post: false,
4028 from_scheduled: false,
4029 legacy: false,
4030 edit_hide: false,
4031 pinned: false,
4032 noforwards: false,
4033 invert_media: input.invert_media,
4034 offline: false,
4035 video_processing_pending: false,
4036 paid_suggested_post_stars: false,
4037 paid_suggested_post_ton: false,
4038 id,
4039 from_id: None,
4040 from_boosts_applied: None,
4041 from_rank: None,
4042 peer_id: peer.clone(),
4043 saved_peer_id: None,
4044 fwd_from: None,
4045 via_bot_id: None,
4046 via_business_bot_id: None,
4047 reply_to: input.reply_to.map(|rid| {
4048 tl::enums::MessageReplyHeader::MessageReplyHeader(tl::types::MessageReplyHeader {
4049 reply_to_scheduled: false,
4050 forum_topic: false,
4051 quote: false,
4052 reply_to_msg_id: Some(rid),
4053 reply_to_peer_id: None,
4054 reply_from: None,
4055 reply_media: None,
4056 reply_to_top_id: None,
4057 quote_text: None,
4058 quote_entities: None,
4059 quote_offset: None,
4060 todo_item_id: None,
4061 poll_option: None,
4062 })
4063 }),
4064 date,
4065 message: input.text.clone(),
4066 media: None,
4067 reply_markup: input.reply_markup.clone(),
4068 entities: input.entities.clone(),
4069 views: None,
4070 forwards: None,
4071 replies: None,
4072 edit_date: None,
4073 post_author: None,
4074 grouped_id: None,
4075 reactions: None,
4076 restriction_reason: None,
4077 ttl_period: None,
4078 quick_reply_shortcut_id: None,
4079 effect: None,
4080 factcheck: None,
4081 report_delivery_until_date: None,
4082 paid_message_stars: None,
4083 suggested_post: None,
4084 schedule_repeat_period: None,
4085 summary_from_language: None,
4086 };
4087 update::IncomingMessage::from_raw(tl::enums::Message::Message(msg))
4088 .with_client(self.clone())
4089 }
4090
4091 pub async fn send_to_self(
4093 &self,
4094 text: &str,
4095 ) -> Result<update::IncomingMessage, InvocationError> {
4096 let req = tl::functions::messages::SendMessage {
4097 no_webpage: false,
4098 silent: false,
4099 background: false,
4100 clear_draft: false,
4101 noforwards: false,
4102 update_stickersets_order: false,
4103 invert_media: false,
4104 allow_paid_floodskip: false,
4105 peer: tl::enums::InputPeer::PeerSelf,
4106 reply_to: None,
4107 message: text.to_string(),
4108 random_id: random_i64(),
4109 reply_markup: None,
4110 entities: None,
4111 schedule_date: None,
4112 schedule_repeat_period: None,
4113 send_as: None,
4114 quick_reply_shortcut: None,
4115 effect: None,
4116 allow_paid_stars: None,
4117 suggested_post: None,
4118 };
4119 let body = self.rpc_call_raw(&req).await?;
4120 let self_peer = tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 });
4121 Ok(self.extract_sent_message(&body, &InputMessage::text(text), &self_peer))
4122 }
4123
4124 pub async fn edit_message(
4126 &self,
4127 peer: impl Into<PeerRef>,
4128 message_id: i32,
4129 new_text: &str,
4130 ) -> Result<(), InvocationError> {
4131 let peer = peer.into().resolve(self).await?;
4132 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4133 let req = tl::functions::messages::EditMessage {
4134 no_webpage: false,
4135 invert_media: false,
4136 peer: input_peer,
4137 id: message_id,
4138 message: Some(new_text.to_string()),
4139 media: None,
4140 reply_markup: None,
4141 entities: None,
4142 schedule_date: None,
4143 schedule_repeat_period: None,
4144 quick_reply_shortcut_id: None,
4145 };
4146 self.rpc_write(&req).await
4147 }
4148
4149 pub async fn forward_messages(
4151 &self,
4152 destination: impl Into<PeerRef>,
4153 message_ids: &[i32],
4154 source: impl Into<PeerRef>,
4155 ) -> Result<(), InvocationError> {
4156 let dest = destination.into().resolve(self).await?;
4157 let src = source.into().resolve(self).await?;
4158 let cache = self.inner.peer_cache.read().await;
4159 let to_peer = cache.peer_to_input(&dest);
4160 let from_peer = cache.peer_to_input(&src);
4161 drop(cache);
4162
4163 let req = tl::functions::messages::ForwardMessages {
4164 silent: false,
4165 background: false,
4166 with_my_score: false,
4167 drop_author: false,
4168 drop_media_captions: false,
4169 noforwards: false,
4170 from_peer,
4171 id: message_ids.to_vec(),
4172 random_id: (0..message_ids.len()).map(|_| random_i64()).collect(),
4173 to_peer,
4174 top_msg_id: None,
4175 reply_to: None,
4176 schedule_date: None,
4177 schedule_repeat_period: None,
4178 send_as: None,
4179 quick_reply_shortcut: None,
4180 effect: None,
4181 video_timestamp: None,
4182 allow_paid_stars: None,
4183 allow_paid_floodskip: false,
4184 suggested_post: None,
4185 };
4186 self.rpc_write(&req).await
4187 }
4188
4189 pub async fn forward_messages_returning(
4194 &self,
4195 destination: impl Into<PeerRef>,
4196 message_ids: &[i32],
4197 source: impl Into<PeerRef>,
4198 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4199 let dest = destination.into().resolve(self).await?;
4200 let src = source.into().resolve(self).await?;
4201 let cache = self.inner.peer_cache.read().await;
4202 let to_peer = cache.peer_to_input(&dest);
4203 let from_peer = cache.peer_to_input(&src);
4204 drop(cache);
4205
4206 let req = tl::functions::messages::ForwardMessages {
4207 silent: false,
4208 background: false,
4209 with_my_score: false,
4210 drop_author: false,
4211 drop_media_captions: false,
4212 noforwards: false,
4213 from_peer,
4214 id: message_ids.to_vec(),
4215 random_id: (0..message_ids.len()).map(|_| random_i64()).collect(),
4216 to_peer,
4217 top_msg_id: None,
4218 reply_to: None,
4219 schedule_date: None,
4220 schedule_repeat_period: None,
4221 send_as: None,
4222 quick_reply_shortcut: None,
4223 effect: None,
4224 video_timestamp: None,
4225 allow_paid_stars: None,
4226 allow_paid_floodskip: false,
4227 suggested_post: None,
4228 };
4229 let body = self.rpc_call_raw(&req).await?;
4230 let mut out = Vec::new();
4232 if body.len() >= 4 {
4233 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
4234 if cid == 0x74ae4240 || cid == 0x725b04c3 {
4235 let mut cur = Cursor::from_slice(&body);
4236 let updates_opt = tl::enums::Updates::deserialize(&mut cur).ok();
4237 let raw_updates = match updates_opt {
4238 Some(tl::enums::Updates::Updates(u)) => u.updates,
4239 Some(tl::enums::Updates::Combined(u)) => u.updates,
4240 _ => vec![],
4241 };
4242 for upd in raw_updates {
4243 match upd {
4244 tl::enums::Update::NewMessage(u) => {
4245 out.push(
4246 update::IncomingMessage::from_raw(u.message)
4247 .with_client(self.clone()),
4248 );
4249 }
4250 tl::enums::Update::NewChannelMessage(u) => {
4251 out.push(
4252 update::IncomingMessage::from_raw(u.message)
4253 .with_client(self.clone()),
4254 );
4255 }
4256 _ => {}
4257 }
4258 }
4259 }
4260 }
4261 Ok(out)
4262 }
4263
4264 pub async fn delete_messages(
4266 &self,
4267 message_ids: Vec<i32>,
4268 revoke: bool,
4269 ) -> Result<(), InvocationError> {
4270 let req = tl::functions::messages::DeleteMessages {
4271 revoke,
4272 id: message_ids,
4273 };
4274 self.rpc_write(&req).await
4275 }
4276
4277 pub async fn get_messages_by_id(
4279 &self,
4280 peer: impl Into<PeerRef>,
4281 ids: &[i32],
4282 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4283 let peer = peer.into().resolve(self).await?;
4284 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4285 let id_list: Vec<tl::enums::InputMessage> = ids
4286 .iter()
4287 .map(|&id| tl::enums::InputMessage::Id(tl::types::InputMessageId { id }))
4288 .collect();
4289 let req = tl::functions::channels::GetMessages {
4290 channel: match &input_peer {
4291 tl::enums::InputPeer::Channel(c) => {
4292 tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
4293 channel_id: c.channel_id,
4294 access_hash: c.access_hash,
4295 })
4296 }
4297 _ => return self.get_messages_user(input_peer, id_list).await,
4298 },
4299 id: id_list,
4300 };
4301 let body = self.rpc_call_raw(&req).await?;
4302 let mut cur = Cursor::from_slice(&body);
4303 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4304 tl::enums::messages::Messages::Messages(m) => m.messages,
4305 tl::enums::messages::Messages::Slice(m) => m.messages,
4306 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4307 tl::enums::messages::Messages::NotModified(_) => vec![],
4308 };
4309 Ok(msgs
4310 .into_iter()
4311 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
4312 .collect())
4313 }
4314
4315 async fn get_messages_user(
4316 &self,
4317 _peer: tl::enums::InputPeer,
4318 ids: Vec<tl::enums::InputMessage>,
4319 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4320 let req = tl::functions::messages::GetMessages { id: ids };
4321 let body = self.rpc_call_raw(&req).await?;
4322 let mut cur = Cursor::from_slice(&body);
4323 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4324 tl::enums::messages::Messages::Messages(m) => m.messages,
4325 tl::enums::messages::Messages::Slice(m) => m.messages,
4326 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4327 tl::enums::messages::Messages::NotModified(_) => vec![],
4328 };
4329 Ok(msgs
4330 .into_iter()
4331 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
4332 .collect())
4333 }
4334
4335 pub async fn get_pinned_message(
4337 &self,
4338 peer: impl Into<PeerRef>,
4339 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
4340 let peer = peer.into().resolve(self).await?;
4341 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4342 let req = tl::functions::messages::Search {
4343 peer: input_peer,
4344 q: String::new(),
4345 from_id: None,
4346 saved_peer_id: None,
4347 saved_reaction: None,
4348 top_msg_id: None,
4349 filter: tl::enums::MessagesFilter::InputMessagesFilterPinned,
4350 min_date: 0,
4351 max_date: 0,
4352 offset_id: 0,
4353 add_offset: 0,
4354 limit: 1,
4355 max_id: 0,
4356 min_id: 0,
4357 hash: 0,
4358 };
4359 let body = self.rpc_call_raw(&req).await?;
4360 let mut cur = Cursor::from_slice(&body);
4361 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4362 tl::enums::messages::Messages::Messages(m) => m.messages,
4363 tl::enums::messages::Messages::Slice(m) => m.messages,
4364 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4365 tl::enums::messages::Messages::NotModified(_) => vec![],
4366 };
4367 Ok(msgs
4368 .into_iter()
4369 .next()
4370 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone())))
4371 }
4372
4373 pub async fn pin_message(
4375 &self,
4376 peer: impl Into<PeerRef>,
4377 message_id: i32,
4378 silent: bool,
4379 unpin: bool,
4380 pm_oneside: bool,
4381 ) -> Result<(), InvocationError> {
4382 let peer = peer.into().resolve(self).await?;
4383 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4384 let req = tl::functions::messages::UpdatePinnedMessage {
4385 silent,
4386 unpin,
4387 pm_oneside,
4388 peer: input_peer,
4389 id: message_id,
4390 };
4391 self.rpc_write(&req).await
4392 }
4393
4394 pub async fn unpin_message(
4396 &self,
4397 peer: impl Into<PeerRef>,
4398 message_id: i32,
4399 ) -> Result<(), InvocationError> {
4400 self.pin_message(peer, message_id, true, true, false).await
4401 }
4402
4403 pub async fn get_reply_to_message(
4418 &self,
4419 message: &update::IncomingMessage,
4420 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
4421 let reply_id = match message.reply_to_message_id() {
4422 Some(id) => id,
4423 None => return Ok(None),
4424 };
4425 let peer = match message.peer_id() {
4426 Some(p) => p.clone(),
4427 None => return Ok(None),
4428 };
4429 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4430 let id = vec![tl::enums::InputMessage::Id(tl::types::InputMessageId {
4431 id: reply_id,
4432 })];
4433
4434 let result = match &input_peer {
4435 tl::enums::InputPeer::Channel(c) => {
4436 let req = tl::functions::channels::GetMessages {
4437 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
4438 channel_id: c.channel_id,
4439 access_hash: c.access_hash,
4440 }),
4441 id,
4442 };
4443 self.rpc_call_raw(&req).await?
4444 }
4445 _ => {
4446 let req = tl::functions::messages::GetMessages { id };
4447 self.rpc_call_raw(&req).await?
4448 }
4449 };
4450
4451 let mut cur = Cursor::from_slice(&result);
4452 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4453 tl::enums::messages::Messages::Messages(m) => m.messages,
4454 tl::enums::messages::Messages::Slice(m) => m.messages,
4455 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4456 tl::enums::messages::Messages::NotModified(_) => vec![],
4457 };
4458 Ok(msgs
4459 .into_iter()
4460 .next()
4461 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone())))
4462 }
4463
4464 pub async fn unpin_all_messages(
4466 &self,
4467 peer: impl Into<PeerRef>,
4468 ) -> Result<(), InvocationError> {
4469 let peer = peer.into().resolve(self).await?;
4470 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4471 let req = tl::functions::messages::UnpinAllMessages {
4472 peer: input_peer,
4473 top_msg_id: None,
4474 saved_peer_id: None,
4475 };
4476 self.rpc_write(&req).await
4477 }
4478
4479 pub async fn search_messages(
4484 &self,
4485 peer: impl Into<PeerRef>,
4486 query: &str,
4487 limit: i32,
4488 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4489 self.search(peer, query).limit(limit).fetch(self).await
4490 }
4491
4492 pub fn search(&self, peer: impl Into<PeerRef>, query: &str) -> SearchBuilder {
4494 SearchBuilder::new(peer.into(), query.to_string())
4495 }
4496
4497 pub async fn search_global(
4499 &self,
4500 query: &str,
4501 limit: i32,
4502 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4503 self.search_global_builder(query)
4504 .limit(limit)
4505 .fetch(self)
4506 .await
4507 }
4508
4509 pub fn search_global_builder(&self, query: &str) -> GlobalSearchBuilder {
4511 GlobalSearchBuilder::new(query.to_string())
4512 }
4513
4514 pub async fn get_scheduled_messages(
4531 &self,
4532 peer: impl Into<PeerRef>,
4533 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4534 let peer = peer.into().resolve(self).await?;
4535 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4536 let req = tl::functions::messages::GetScheduledHistory {
4537 peer: input_peer,
4538 hash: 0,
4539 };
4540 let body = self.rpc_call_raw(&req).await?;
4541 let mut cur = Cursor::from_slice(&body);
4542 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4543 tl::enums::messages::Messages::Messages(m) => m.messages,
4544 tl::enums::messages::Messages::Slice(m) => m.messages,
4545 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4546 tl::enums::messages::Messages::NotModified(_) => vec![],
4547 };
4548 Ok(msgs
4549 .into_iter()
4550 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
4551 .collect())
4552 }
4553
4554 pub async fn delete_scheduled_messages(
4556 &self,
4557 peer: impl Into<PeerRef>,
4558 ids: Vec<i32>,
4559 ) -> Result<(), InvocationError> {
4560 let peer = peer.into().resolve(self).await?;
4561 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4562 let req = tl::functions::messages::DeleteScheduledMessages {
4563 peer: input_peer,
4564 id: ids,
4565 };
4566 self.rpc_write(&req).await
4567 }
4568
4569 pub async fn edit_inline_message(
4587 &self,
4588 id: tl::enums::InputBotInlineMessageId,
4589 new_text: &str,
4590 reply_markup: Option<tl::enums::ReplyMarkup>,
4591 ) -> Result<bool, InvocationError> {
4592 let req = tl::functions::messages::EditInlineBotMessage {
4593 no_webpage: false,
4594 invert_media: false,
4595 id,
4596 message: Some(new_text.to_string()),
4597 media: None,
4598 reply_markup,
4599 entities: None,
4600 };
4601 let body = self.rpc_call_raw(&req).await?;
4602 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
4604 }
4605
4606 pub async fn answer_callback_query(
4608 &self,
4609 query_id: i64,
4610 text: Option<&str>,
4611 alert: bool,
4612 ) -> Result<bool, InvocationError> {
4613 let req = tl::functions::messages::SetBotCallbackAnswer {
4614 alert,
4615 query_id,
4616 message: text.map(|s| s.to_string()),
4617 url: None,
4618 cache_time: 0,
4619 };
4620 let body = self.rpc_call_raw(&req).await?;
4621 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
4622 }
4623
4624 pub async fn answer_inline_query(
4625 &self,
4626 query_id: i64,
4627 results: Vec<tl::enums::InputBotInlineResult>,
4628 cache_time: i32,
4629 is_personal: bool,
4630 next_offset: Option<String>,
4631 ) -> Result<bool, InvocationError> {
4632 let req = tl::functions::messages::SetInlineBotResults {
4633 gallery: false,
4634 private: is_personal,
4635 query_id,
4636 results,
4637 cache_time,
4638 next_offset,
4639 switch_pm: None,
4640 switch_webview: None,
4641 };
4642 let body = self.rpc_call_raw(&req).await?;
4643 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
4644 }
4645
4646 pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
4650 let req = tl::functions::messages::GetDialogs {
4651 exclude_pinned: false,
4652 folder_id: None,
4653 offset_date: 0,
4654 offset_id: 0,
4655 offset_peer: tl::enums::InputPeer::Empty,
4656 limit,
4657 hash: 0,
4658 };
4659
4660 let body = self.rpc_call_raw(&req).await?;
4661 let mut cur = Cursor::from_slice(&body);
4662 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
4663 tl::enums::messages::Dialogs::Dialogs(d) => d,
4664 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
4665 dialogs: d.dialogs,
4666 messages: d.messages,
4667 chats: d.chats,
4668 users: d.users,
4669 },
4670 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
4671 };
4672
4673 let msg_map: HashMap<i32, tl::enums::Message> = raw
4675 .messages
4676 .into_iter()
4677 .map(|m| {
4678 let id = match &m {
4679 tl::enums::Message::Message(x) => x.id,
4680 tl::enums::Message::Service(x) => x.id,
4681 tl::enums::Message::Empty(x) => x.id,
4682 };
4683 (id, m)
4684 })
4685 .collect();
4686
4687 let user_map: HashMap<i64, tl::enums::User> = raw
4689 .users
4690 .into_iter()
4691 .filter_map(|u| {
4692 if let tl::enums::User::User(ref uu) = u {
4693 Some((uu.id, u))
4694 } else {
4695 None
4696 }
4697 })
4698 .collect();
4699
4700 let chat_map: HashMap<i64, tl::enums::Chat> = raw
4702 .chats
4703 .into_iter()
4704 .map(|c| {
4705 let id = match &c {
4706 tl::enums::Chat::Chat(x) => x.id,
4707 tl::enums::Chat::Forbidden(x) => x.id,
4708 tl::enums::Chat::Channel(x) => x.id,
4709 tl::enums::Chat::ChannelForbidden(x) => x.id,
4710 tl::enums::Chat::Empty(x) => x.id,
4711 };
4712 (id, c)
4713 })
4714 .collect();
4715
4716 {
4718 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
4719 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
4720 self.cache_users_and_chats(&u_list, &c_list).await;
4721 }
4722
4723 let result = raw
4724 .dialogs
4725 .into_iter()
4726 .map(|d| {
4727 let top_id = match &d {
4728 tl::enums::Dialog::Dialog(x) => x.top_message,
4729 _ => 0,
4730 };
4731 let peer = match &d {
4732 tl::enums::Dialog::Dialog(x) => Some(&x.peer),
4733 _ => None,
4734 };
4735
4736 let message = msg_map.get(&top_id).cloned();
4737 let entity = peer.and_then(|p| match p {
4738 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
4739 _ => None,
4740 });
4741 let chat = peer.and_then(|p| match p {
4742 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
4743 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
4744 _ => None,
4745 });
4746
4747 Dialog {
4748 raw: d,
4749 message,
4750 entity,
4751 chat,
4752 }
4753 })
4754 .collect();
4755
4756 Ok(result)
4757 }
4758
4759 #[allow(dead_code)]
4761 async fn get_dialogs_raw(
4762 &self,
4763 req: tl::functions::messages::GetDialogs,
4764 ) -> Result<Vec<Dialog>, InvocationError> {
4765 let body = self.rpc_call_raw(&req).await?;
4766 let mut cur = Cursor::from_slice(&body);
4767 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
4768 tl::enums::messages::Dialogs::Dialogs(d) => d,
4769 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
4770 dialogs: d.dialogs,
4771 messages: d.messages,
4772 chats: d.chats,
4773 users: d.users,
4774 },
4775 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
4776 };
4777
4778 let msg_map: HashMap<i32, tl::enums::Message> = raw
4779 .messages
4780 .into_iter()
4781 .map(|m| {
4782 let id = match &m {
4783 tl::enums::Message::Message(x) => x.id,
4784 tl::enums::Message::Service(x) => x.id,
4785 tl::enums::Message::Empty(x) => x.id,
4786 };
4787 (id, m)
4788 })
4789 .collect();
4790
4791 let user_map: HashMap<i64, tl::enums::User> = raw
4792 .users
4793 .into_iter()
4794 .filter_map(|u| {
4795 if let tl::enums::User::User(ref uu) = u {
4796 Some((uu.id, u))
4797 } else {
4798 None
4799 }
4800 })
4801 .collect();
4802
4803 let chat_map: HashMap<i64, tl::enums::Chat> = raw
4804 .chats
4805 .into_iter()
4806 .map(|c| {
4807 let id = match &c {
4808 tl::enums::Chat::Chat(x) => x.id,
4809 tl::enums::Chat::Forbidden(x) => x.id,
4810 tl::enums::Chat::Channel(x) => x.id,
4811 tl::enums::Chat::ChannelForbidden(x) => x.id,
4812 tl::enums::Chat::Empty(x) => x.id,
4813 };
4814 (id, c)
4815 })
4816 .collect();
4817
4818 {
4819 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
4820 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
4821 self.cache_users_and_chats(&u_list, &c_list).await;
4822 }
4823
4824 let result = raw
4825 .dialogs
4826 .into_iter()
4827 .map(|d| {
4828 let top_id = match &d {
4829 tl::enums::Dialog::Dialog(x) => x.top_message,
4830 _ => 0,
4831 };
4832 let peer = match &d {
4833 tl::enums::Dialog::Dialog(x) => Some(&x.peer),
4834 _ => None,
4835 };
4836
4837 let message = msg_map.get(&top_id).cloned();
4838 let entity = peer.and_then(|p| match p {
4839 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
4840 _ => None,
4841 });
4842 let chat = peer.and_then(|p| match p {
4843 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
4844 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
4845 _ => None,
4846 });
4847
4848 Dialog {
4849 raw: d,
4850 message,
4851 entity,
4852 chat,
4853 }
4854 })
4855 .collect();
4856
4857 Ok(result)
4858 }
4859
4860 async fn get_dialogs_raw_with_count(
4862 &self,
4863 req: tl::functions::messages::GetDialogs,
4864 ) -> Result<(Vec<Dialog>, Option<i32>), InvocationError> {
4865 let body = self.rpc_call_raw(&req).await?;
4866 let mut cur = Cursor::from_slice(&body);
4867 let (raw, count) = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
4868 tl::enums::messages::Dialogs::Dialogs(d) => (d, None),
4869 tl::enums::messages::Dialogs::Slice(d) => {
4870 let cnt = Some(d.count);
4871 (
4872 tl::types::messages::Dialogs {
4873 dialogs: d.dialogs,
4874 messages: d.messages,
4875 chats: d.chats,
4876 users: d.users,
4877 },
4878 cnt,
4879 )
4880 }
4881 tl::enums::messages::Dialogs::NotModified(_) => return Ok((vec![], None)),
4882 };
4883
4884 let msg_map: HashMap<i32, tl::enums::Message> = raw
4885 .messages
4886 .into_iter()
4887 .map(|m| {
4888 let id = match &m {
4889 tl::enums::Message::Message(x) => x.id,
4890 tl::enums::Message::Service(x) => x.id,
4891 tl::enums::Message::Empty(x) => x.id,
4892 };
4893 (id, m)
4894 })
4895 .collect();
4896
4897 let user_map: HashMap<i64, tl::enums::User> = raw
4898 .users
4899 .into_iter()
4900 .filter_map(|u| {
4901 if let tl::enums::User::User(ref uu) = u {
4902 Some((uu.id, u))
4903 } else {
4904 None
4905 }
4906 })
4907 .collect();
4908
4909 let chat_map: HashMap<i64, tl::enums::Chat> = raw
4910 .chats
4911 .into_iter()
4912 .map(|c| {
4913 let id = match &c {
4914 tl::enums::Chat::Chat(x) => x.id,
4915 tl::enums::Chat::Forbidden(x) => x.id,
4916 tl::enums::Chat::Channel(x) => x.id,
4917 tl::enums::Chat::ChannelForbidden(x) => x.id,
4918 tl::enums::Chat::Empty(x) => x.id,
4919 };
4920 (id, c)
4921 })
4922 .collect();
4923
4924 {
4925 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
4926 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
4927 self.cache_users_and_chats(&u_list, &c_list).await;
4928 }
4929
4930 let result = raw
4931 .dialogs
4932 .into_iter()
4933 .map(|d| {
4934 let top_id = match &d {
4935 tl::enums::Dialog::Dialog(x) => x.top_message,
4936 _ => 0,
4937 };
4938 let peer = match &d {
4939 tl::enums::Dialog::Dialog(x) => Some(&x.peer),
4940 _ => None,
4941 };
4942 let message = msg_map.get(&top_id).cloned();
4943 let entity = peer.and_then(|p| match p {
4944 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
4945 _ => None,
4946 });
4947 let chat = peer.and_then(|p| match p {
4948 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
4949 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
4950 _ => None,
4951 });
4952 Dialog {
4953 raw: d,
4954 message,
4955 entity,
4956 chat,
4957 }
4958 })
4959 .collect();
4960
4961 Ok((result, count))
4962 }
4963
4964 async fn get_messages_with_count(
4966 &self,
4967 peer: tl::enums::InputPeer,
4968 limit: i32,
4969 offset_id: i32,
4970 ) -> Result<(Vec<update::IncomingMessage>, Option<i32>), InvocationError> {
4971 let req = tl::functions::messages::GetHistory {
4972 peer,
4973 offset_id,
4974 offset_date: 0,
4975 add_offset: 0,
4976 limit,
4977 max_id: 0,
4978 min_id: 0,
4979 hash: 0,
4980 };
4981 let body = self.rpc_call_raw(&req).await?;
4982 let mut cur = Cursor::from_slice(&body);
4983 let (msgs, count) = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4984 tl::enums::messages::Messages::Messages(m) => (m.messages, None),
4985 tl::enums::messages::Messages::Slice(m) => {
4986 let cnt = Some(m.count);
4987 (m.messages, cnt)
4988 }
4989 tl::enums::messages::Messages::ChannelMessages(m) => (m.messages, Some(m.count)),
4990 tl::enums::messages::Messages::NotModified(_) => (vec![], None),
4991 };
4992 Ok((
4993 msgs.into_iter()
4994 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
4995 .collect(),
4996 count,
4997 ))
4998 }
4999
5000 pub async fn download_media_to_file(
5011 &self,
5012 location: tl::enums::InputFileLocation,
5013 path: impl AsRef<std::path::Path>,
5014 ) -> Result<(), InvocationError> {
5015 let bytes = self.download_media(location).await?;
5016 std::fs::write(path, &bytes).map_err(InvocationError::Io)?;
5017 Ok(())
5018 }
5019
5020 pub async fn delete_dialog(&self, peer: impl Into<PeerRef>) -> Result<(), InvocationError> {
5021 let peer = peer.into().resolve(self).await?;
5022 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
5023 let req = tl::functions::messages::DeleteHistory {
5024 just_clear: false,
5025 revoke: false,
5026 peer: input_peer,
5027 max_id: 0,
5028 min_date: None,
5029 max_date: None,
5030 };
5031 self.rpc_write(&req).await
5032 }
5033
5034 pub async fn mark_as_read(&self, peer: impl Into<PeerRef>) -> Result<(), InvocationError> {
5036 let peer = peer.into().resolve(self).await?;
5037 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
5038 match &input_peer {
5039 tl::enums::InputPeer::Channel(c) => {
5040 let req = tl::functions::channels::ReadHistory {
5041 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
5042 channel_id: c.channel_id,
5043 access_hash: c.access_hash,
5044 }),
5045 max_id: 0,
5046 };
5047 self.rpc_call_raw(&req).await?;
5048 }
5049 _ => {
5050 let req = tl::functions::messages::ReadHistory {
5051 peer: input_peer,
5052 max_id: 0,
5053 };
5054 self.rpc_call_raw(&req).await?;
5055 }
5056 }
5057 Ok(())
5058 }
5059
5060 pub async fn clear_mentions(&self, peer: impl Into<PeerRef>) -> Result<(), InvocationError> {
5062 let peer = peer.into().resolve(self).await?;
5063 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
5064 let req = tl::functions::messages::ReadMentions {
5065 peer: input_peer,
5066 top_msg_id: None,
5067 };
5068 self.rpc_write(&req).await
5069 }
5070
5071 pub async fn send_chat_action(
5079 &self,
5080 peer: impl Into<PeerRef>,
5081 action: tl::enums::SendMessageAction,
5082 ) -> Result<(), InvocationError> {
5083 let peer = peer.into().resolve(self).await?;
5084 self.send_chat_action_ex(peer, action, None).await
5085 }
5086
5087 pub async fn join_chat(&self, peer: impl Into<PeerRef>) -> Result<(), InvocationError> {
5091 let peer = peer.into().resolve(self).await?;
5092 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
5093 match input_peer {
5094 tl::enums::InputPeer::Channel(c) => {
5095 let req = tl::functions::channels::JoinChannel {
5096 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
5097 channel_id: c.channel_id,
5098 access_hash: c.access_hash,
5099 }),
5100 };
5101 self.rpc_call_raw(&req).await?;
5102 }
5103 tl::enums::InputPeer::Chat(c) => {
5104 let req = tl::functions::messages::AddChatUser {
5105 chat_id: c.chat_id,
5106 user_id: tl::enums::InputUser::UserSelf,
5107 fwd_limit: 0,
5108 };
5109 self.rpc_call_raw(&req).await?;
5110 }
5111 _ => {
5112 return Err(InvocationError::Deserialize(
5113 "cannot join this peer type".into(),
5114 ));
5115 }
5116 }
5117 Ok(())
5118 }
5119
5120 pub async fn accept_invite_link(&self, link: &str) -> Result<(), InvocationError> {
5122 let hash = Self::parse_invite_hash(link)
5123 .ok_or_else(|| InvocationError::Deserialize(format!("invalid invite link: {link}")))?;
5124 let req = tl::functions::messages::ImportChatInvite {
5125 hash: hash.to_string(),
5126 };
5127 self.rpc_write(&req).await
5128 }
5129
5130 pub fn parse_invite_hash(link: &str) -> Option<&str> {
5132 if let Some(pos) = link.find("/+") {
5133 return Some(&link[pos + 2..]);
5134 }
5135 if let Some(pos) = link.find("/joinchat/") {
5136 return Some(&link[pos + 10..]);
5137 }
5138 None
5139 }
5140
5141 pub async fn get_messages(
5145 &self,
5146 peer: tl::enums::InputPeer,
5147 limit: i32,
5148 offset_id: i32,
5149 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
5150 let req = tl::functions::messages::GetHistory {
5151 peer,
5152 offset_id,
5153 offset_date: 0,
5154 add_offset: 0,
5155 limit,
5156 max_id: 0,
5157 min_id: 0,
5158 hash: 0,
5159 };
5160 let body = self.rpc_call_raw(&req).await?;
5161 let mut cur = Cursor::from_slice(&body);
5162 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
5163 tl::enums::messages::Messages::Messages(m) => m.messages,
5164 tl::enums::messages::Messages::Slice(m) => m.messages,
5165 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
5166 tl::enums::messages::Messages::NotModified(_) => vec![],
5167 };
5168 Ok(msgs
5169 .into_iter()
5170 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
5171 .collect())
5172 }
5173
5174 pub async fn resolve_peer(&self, peer: &str) -> Result<tl::enums::Peer, InvocationError> {
5178 match peer.trim() {
5179 "me" | "self" => Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 })),
5180 username if username.starts_with('@') => self.resolve_username(&username[1..]).await,
5181 id_str => {
5182 if let Ok(id) = id_str.parse::<i64>() {
5183 Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: id }))
5184 } else {
5185 Err(InvocationError::Deserialize(format!(
5186 "cannot resolve peer: {peer}"
5187 )))
5188 }
5189 }
5190 }
5191 }
5192
5193 pub async fn resolve_username(
5197 &self,
5198 username: &str,
5199 ) -> Result<tl::enums::Peer, InvocationError> {
5200 let req = tl::functions::contacts::ResolveUsername {
5201 username: username.to_string(),
5202 referer: None,
5203 };
5204 let body = self.rpc_call_raw(&req).await?;
5205 let mut cur = Cursor::from_slice(&body);
5206 let tl::enums::contacts::ResolvedPeer::ResolvedPeer(resolved) =
5207 tl::enums::contacts::ResolvedPeer::deserialize(&mut cur)?;
5208 self.cache_users_slice(&resolved.users).await;
5210 self.cache_chats_slice(&resolved.chats).await;
5211 Ok(resolved.peer)
5212 }
5213
5214 fn spawn_salt_fetch_if_needed(&self) {
5224 if self
5225 .inner
5226 .salt_request_in_flight
5227 .compare_exchange(
5228 false,
5229 true,
5230 std::sync::atomic::Ordering::SeqCst,
5231 std::sync::atomic::Ordering::SeqCst,
5232 )
5233 .is_err()
5234 {
5235 return; }
5237 let inner = Arc::clone(&self.inner);
5238 tokio::spawn(async move {
5239 tracing::debug!("[layer] proactive GetFutureSalts spawned");
5240 let mut req_body = Vec::with_capacity(8);
5241 req_body.extend_from_slice(&0xb921bd04_u32.to_le_bytes()); req_body.extend_from_slice(&64_i32.to_le_bytes()); let (wire, fk, fs_msg_id) = {
5244 let mut w = inner.writer.lock().await;
5245 let fk = w.frame_kind.clone();
5246 let (wire, id) = w.enc.pack_body_with_msg_id(&req_body, true);
5247 w.sent_bodies.insert(id, req_body);
5248 (wire, fk, id)
5249 };
5250 let (tx, rx) = tokio::sync::oneshot::channel();
5251 inner.pending.lock().await.insert(fs_msg_id, tx);
5252 let send_ok = {
5253 send_frame_write(&mut *inner.write_half.lock().await, &wire, &fk)
5254 .await
5255 .is_ok()
5256 };
5257 if !send_ok {
5258 inner.pending.lock().await.remove(&fs_msg_id);
5259 inner.writer.lock().await.sent_bodies.remove(&fs_msg_id);
5260 inner
5261 .salt_request_in_flight
5262 .store(false, std::sync::atomic::Ordering::SeqCst);
5263 return;
5264 }
5265 let _ = rx.await;
5266 inner
5267 .salt_request_in_flight
5268 .store(false, std::sync::atomic::Ordering::SeqCst);
5269 });
5270 }
5271
5272 pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
5273 let body = self.rpc_call_raw(req).await?;
5274 let mut cur = Cursor::from_slice(&body);
5275 R::Return::deserialize(&mut cur).map_err(Into::into)
5276 }
5277
5278 async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
5279 let mut rl = RetryLoop::new(Arc::clone(&self.inner.retry_policy));
5280 loop {
5281 match self.do_rpc_call(req).await {
5282 Ok(body) => return Ok(body),
5283 Err(e) if e.migrate_dc_id().is_some() => {
5284 self.migrate_to(e.migrate_dc_id().unwrap()).await?;
5287 }
5288 Err(InvocationError::Rpc(ref r)) if r.code == 401 => {
5295 return Err(InvocationError::Rpc(r.clone()));
5296 }
5297 Err(e) => rl.advance(e).await?,
5298 }
5299 }
5300 }
5301
5302 async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
5312 let (tx, rx) = oneshot::channel();
5313 let wire = {
5314 let raw_body = req.to_bytes();
5315 let body = maybe_gz_pack(&raw_body);
5317
5318 let mut w = self.inner.writer.lock().await;
5319
5320 if w.advance_salt_if_needed() {
5324 drop(w); self.spawn_salt_fetch_if_needed();
5326 w = self.inner.writer.lock().await;
5327 }
5328
5329 let fk = w.frame_kind.clone();
5330
5331 let acks: Vec<i64> = w.pending_ack.drain(..).collect();
5334
5335 if acks.is_empty() {
5336 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
5338 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
5340 (wire, fk)
5341 } else {
5342 let ack_body = build_msgs_ack_body(&acks);
5344 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false); let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true); let container_payload = build_container_body(&[
5348 (ack_msg_id, ack_seqno, ack_body.as_slice()),
5349 (req_msg_id, req_seqno, body.as_slice()),
5350 ]);
5351
5352 let (wire, container_msg_id) = w.enc.pack_container(&container_payload);
5353
5354 w.sent_bodies.insert(req_msg_id, body); w.container_map.insert(container_msg_id, req_msg_id); self.inner.pending.lock().await.insert(req_msg_id, tx);
5357 tracing::debug!(
5358 "[layer] container: bundled {} acks + request (cid={container_msg_id})",
5359 acks.len()
5360 );
5361 (wire, fk)
5362 }
5363 };
5365 send_frame_write(&mut *self.inner.write_half.lock().await, &wire.0, &wire.1).await?;
5367 match rx.await {
5368 Ok(result) => result,
5369 Err(_) => Err(InvocationError::Deserialize(
5370 "RPC channel closed (reader died?)".into(),
5371 )),
5372 }
5373 }
5374
5375 async fn rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
5378 let mut fail_count = NonZeroU32::new(1).unwrap();
5379 let mut slept_so_far = Duration::default();
5380 loop {
5381 let result = self.do_rpc_write(req).await;
5382 match result {
5383 Ok(()) => return Ok(()),
5384 Err(e) => {
5385 let ctx = RetryContext {
5386 fail_count,
5387 slept_so_far,
5388 error: e,
5389 };
5390 match self.inner.retry_policy.should_retry(&ctx) {
5391 ControlFlow::Continue(delay) => {
5392 sleep(delay).await;
5393 slept_so_far += delay;
5394 fail_count = fail_count.saturating_add(1);
5395 }
5396 ControlFlow::Break(()) => return Err(ctx.error),
5397 }
5398 }
5399 }
5400 }
5401 }
5402
5403 async fn do_rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
5404 let (tx, rx) = oneshot::channel();
5405 let wire = {
5406 let raw_body = req.to_bytes();
5407 let body = maybe_gz_pack(&raw_body);
5409
5410 let mut w = self.inner.writer.lock().await;
5411 let fk = w.frame_kind.clone();
5412
5413 let acks: Vec<i64> = w.pending_ack.drain(..).collect();
5415
5416 if acks.is_empty() {
5417 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
5418 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
5420 (wire, fk)
5421 } else {
5422 let ack_body = build_msgs_ack_body(&acks);
5423 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
5424 let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);
5425 let container_payload = build_container_body(&[
5426 (ack_msg_id, ack_seqno, ack_body.as_slice()),
5427 (req_msg_id, req_seqno, body.as_slice()),
5428 ]);
5429 let (wire, container_msg_id) = w.enc.pack_container(&container_payload);
5430 w.sent_bodies.insert(req_msg_id, body); w.container_map.insert(container_msg_id, req_msg_id); self.inner.pending.lock().await.insert(req_msg_id, tx);
5433 tracing::debug!(
5434 "[layer] write container: bundled {} acks + write (cid={container_msg_id})",
5435 acks.len()
5436 );
5437 (wire, fk)
5438 }
5439 };
5441 send_frame_write(&mut *self.inner.write_half.lock().await, &wire.0, &wire.1).await?;
5442 match rx.await {
5443 Ok(result) => result.map(|_| ()),
5444 Err(_) => Err(InvocationError::Deserialize(
5445 "rpc_write channel closed".into(),
5446 )),
5447 }
5448 }
5449
5450 async fn init_connection(&self) -> Result<(), InvocationError> {
5453 use tl::functions::{InitConnection, InvokeWithLayer, help::GetConfig};
5454 let req = InvokeWithLayer {
5455 layer: tl::LAYER,
5456 query: InitConnection {
5457 api_id: self.inner.api_id,
5458 device_model: self.inner.device_model.clone(),
5459 system_version: self.inner.system_version.clone(),
5460 app_version: self.inner.app_version.clone(),
5461 system_lang_code: self.inner.system_lang_code.clone(),
5462 lang_pack: self.inner.lang_pack.clone(),
5463 lang_code: self.inner.lang_code.clone(),
5464 proxy: None,
5465 params: None,
5466 query: GetConfig {},
5467 },
5468 };
5469
5470 let body = self.rpc_call_raw_serializable(&req).await?;
5472
5473 let mut cur = Cursor::from_slice(&body);
5474 if let Ok(tl::enums::Config::Config(cfg)) = tl::enums::Config::deserialize(&mut cur) {
5475 let allow_ipv6 = self.inner.allow_ipv6;
5476 let mut opts = self.inner.dc_options.lock().await;
5477 let mut media_opts = self.inner.media_dc_options.lock().await;
5478 for opt in &cfg.dc_options {
5479 let tl::enums::DcOption::DcOption(o) = opt;
5480 if o.ipv6 && !allow_ipv6 {
5481 continue;
5482 }
5483 let addr = format!("{}:{}", o.ip_address, o.port);
5484 let mut flags = DcFlags::NONE;
5485 if o.ipv6 {
5486 flags.set(DcFlags::IPV6);
5487 }
5488 if o.media_only {
5489 flags.set(DcFlags::MEDIA_ONLY);
5490 }
5491 if o.tcpo_only {
5492 flags.set(DcFlags::TCPO_ONLY);
5493 }
5494 if o.cdn {
5495 flags.set(DcFlags::CDN);
5496 }
5497 if o.r#static {
5498 flags.set(DcFlags::STATIC);
5499 }
5500
5501 if o.media_only || o.cdn {
5502 let e = media_opts.entry(o.id).or_insert_with(|| DcEntry {
5503 dc_id: o.id,
5504 addr: addr.clone(),
5505 auth_key: None,
5506 first_salt: 0,
5507 time_offset: 0,
5508 flags,
5509 });
5510 e.addr = addr;
5511 e.flags = flags;
5512 } else if !o.tcpo_only {
5513 let e = opts.entry(o.id).or_insert_with(|| DcEntry {
5514 dc_id: o.id,
5515 addr: addr.clone(),
5516 auth_key: None,
5517 first_salt: 0,
5518 time_offset: 0,
5519 flags,
5520 });
5521 e.addr = addr;
5522 e.flags = flags;
5523 }
5524 }
5525 tracing::info!(
5526 "[layer] initConnection ✓ ({} DCs, ipv6={})",
5527 cfg.dc_options.len(),
5528 allow_ipv6
5529 );
5530 }
5531 Ok(())
5532 }
5533
5534 async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
5537 let addr = {
5538 let opts = self.inner.dc_options.lock().await;
5539 opts.get(&new_dc_id)
5540 .map(|e| e.addr.clone())
5541 .unwrap_or_else(|| crate::dc_migration::fallback_dc_addr(new_dc_id).to_string())
5542 };
5543 tracing::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
5544
5545 let saved_key = {
5546 let opts = self.inner.dc_options.lock().await;
5547 opts.get(&new_dc_id).and_then(|e| e.auth_key)
5548 };
5549
5550 let socks5 = self.inner.socks5.clone();
5551 let mtproxy = self.inner.mtproxy.clone();
5552 let transport = self.inner.transport.clone();
5553 let conn = if let Some(key) = saved_key {
5554 Connection::connect_with_key(
5555 &addr,
5556 key,
5557 0,
5558 0,
5559 socks5.as_ref(),
5560 &transport,
5561 new_dc_id as i16,
5562 )
5563 .await?
5564 } else {
5565 Connection::connect_raw(
5566 &addr,
5567 socks5.as_ref(),
5568 mtproxy.as_ref(),
5569 &transport,
5570 new_dc_id as i16,
5571 )
5572 .await?
5573 };
5574
5575 let new_key = conn.auth_key_bytes();
5576 {
5577 let mut opts = self.inner.dc_options.lock().await;
5578 let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
5579 dc_id: new_dc_id,
5580 addr: addr.clone(),
5581 auth_key: None,
5582 first_salt: 0,
5583 time_offset: 0,
5584 flags: DcFlags::NONE,
5585 });
5586 entry.auth_key = Some(new_key);
5587 }
5588
5589 let (new_writer, new_wh, new_read, new_fk) = conn.into_writer();
5591 let new_ak = new_writer.enc.auth_key_bytes();
5592 let new_sid = new_writer.enc.session_id();
5593 *self.inner.writer.lock().await = new_writer;
5594 *self.inner.write_half.lock().await = new_wh;
5595 *self.inner.home_dc_id.lock().await = new_dc_id;
5596
5597 let _ = self
5600 .inner
5601 .reconnect_tx
5602 .send((new_read, new_fk, new_ak, new_sid));
5603
5604 loop {
5614 match self.init_connection().await {
5615 Ok(()) => break,
5616 Err(InvocationError::Rpc(ref r)) if r.flood_wait_seconds().is_some() => {
5617 let secs = r.flood_wait_seconds().unwrap();
5618 tracing::warn!(
5619 "[layer] migrate_to DC{new_dc_id}: init FLOOD_WAIT_{secs}: waiting"
5620 );
5621 sleep(Duration::from_secs(secs + 1)).await;
5622 }
5623 Err(e) => return Err(e),
5624 }
5625 }
5626
5627 self.save_session().await.ok();
5628 tracing::info!("[layer] Now on DC{new_dc_id} ✓");
5629 Ok(())
5630 }
5631
5632 pub fn disconnect(&self) {
5642 self.inner.shutdown_token.cancel();
5643 }
5644
5645 pub async fn sync_update_state(&self) {
5653 let _ = self.sync_pts_state().await;
5654 }
5655
5656 async fn cache_user(&self, user: &tl::enums::User) {
5659 self.inner.peer_cache.write().await.cache_user(user);
5660 }
5661
5662 async fn cache_users_slice(&self, users: &[tl::enums::User]) {
5663 let mut cache = self.inner.peer_cache.write().await;
5664 cache.cache_users(users);
5665 }
5666
5667 async fn cache_chats_slice(&self, chats: &[tl::enums::Chat]) {
5668 let mut cache = self.inner.peer_cache.write().await;
5669 cache.cache_chats(chats);
5670 }
5671
5672 async fn cache_users_and_chats(&self, users: &[tl::enums::User], chats: &[tl::enums::Chat]) {
5674 let mut cache = self.inner.peer_cache.write().await;
5675 cache.cache_users(users);
5676 cache.cache_chats(chats);
5677 }
5678
5679 #[doc(hidden)]
5681 pub async fn cache_users_slice_pub(&self, users: &[tl::enums::User]) {
5682 self.cache_users_slice(users).await;
5683 }
5684
5685 #[doc(hidden)]
5686 pub async fn cache_chats_slice_pub(&self, chats: &[tl::enums::Chat]) {
5687 self.cache_chats_slice(chats).await;
5688 }
5689
5690 #[doc(hidden)]
5692 pub async fn rpc_call_raw_pub<R: layer_tl_types::RemoteCall>(
5693 &self,
5694 req: &R,
5695 ) -> Result<Vec<u8>, InvocationError> {
5696 self.rpc_call_raw(req).await
5697 }
5698
5699 async fn rpc_call_raw_serializable<S: tl::Serializable>(
5701 &self,
5702 req: &S,
5703 ) -> Result<Vec<u8>, InvocationError> {
5704 let mut fail_count = NonZeroU32::new(1).unwrap();
5705 let mut slept_so_far = Duration::default();
5706 loop {
5707 match self.do_rpc_write_returning_body(req).await {
5708 Ok(body) => return Ok(body),
5709 Err(e) => {
5710 let ctx = RetryContext {
5711 fail_count,
5712 slept_so_far,
5713 error: e,
5714 };
5715 match self.inner.retry_policy.should_retry(&ctx) {
5716 ControlFlow::Continue(delay) => {
5717 sleep(delay).await;
5718 slept_so_far += delay;
5719 fail_count = fail_count.saturating_add(1);
5720 }
5721 ControlFlow::Break(()) => return Err(ctx.error),
5722 }
5723 }
5724 }
5725 }
5726 }
5727
5728 async fn do_rpc_write_returning_body<S: tl::Serializable>(
5729 &self,
5730 req: &S,
5731 ) -> Result<Vec<u8>, InvocationError> {
5732 let (tx, rx) = oneshot::channel();
5733 let wire = {
5734 let raw_body = req.to_bytes();
5735 let body = maybe_gz_pack(&raw_body); let mut w = self.inner.writer.lock().await;
5737 let fk = w.frame_kind.clone();
5738 let acks: Vec<i64> = w.pending_ack.drain(..).collect(); if acks.is_empty() {
5740 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
5741 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
5743 (wire, fk)
5744 } else {
5745 let ack_body = build_msgs_ack_body(&acks);
5746 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
5747 let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);
5748 let container_payload = build_container_body(&[
5749 (ack_msg_id, ack_seqno, ack_body.as_slice()),
5750 (req_msg_id, req_seqno, body.as_slice()),
5751 ]);
5752 let (wire, container_msg_id) = w.enc.pack_container(&container_payload);
5753 w.sent_bodies.insert(req_msg_id, body); w.container_map.insert(container_msg_id, req_msg_id); self.inner.pending.lock().await.insert(req_msg_id, tx);
5756 (wire, fk)
5757 }
5758 };
5760 send_frame_write(&mut *self.inner.write_half.lock().await, &wire.0, &wire.1).await?;
5761 match rx.await {
5762 Ok(result) => result,
5763 Err(_) => Err(InvocationError::Deserialize("rpc channel closed".into())),
5764 }
5765 }
5766
5767 pub async fn count_channels(&self) -> Result<usize, InvocationError> {
5770 let mut iter = self.iter_dialogs();
5771 let mut count = 0usize;
5772 while let Some(dialog) = iter.next(self).await? {
5773 if matches!(dialog.peer(), Some(tl::enums::Peer::Channel(_))) {
5774 count += 1;
5775 }
5776 }
5777 Ok(count)
5778 }
5779
5780 pub fn iter_dialogs(&self) -> DialogIter {
5794 DialogIter {
5795 offset_date: 0,
5796 offset_id: 0,
5797 offset_peer: tl::enums::InputPeer::Empty,
5798 done: false,
5799 buffer: VecDeque::new(),
5800 total: None,
5801 }
5802 }
5803
5804 pub fn iter_messages(&self, peer: impl Into<PeerRef>) -> MessageIter {
5818 MessageIter {
5819 unresolved: Some(peer.into()),
5820 peer: None,
5821 offset_id: 0,
5822 done: false,
5823 buffer: VecDeque::new(),
5824 total: None,
5825 }
5826 }
5827
5828 pub async fn resolve_to_input_peer(
5833 &self,
5834 peer: &tl::enums::Peer,
5835 ) -> Result<tl::enums::InputPeer, InvocationError> {
5836 let cache = self.inner.peer_cache.read().await;
5837 match peer {
5838 tl::enums::Peer::User(u) => {
5839 if u.user_id == 0 {
5840 return Ok(tl::enums::InputPeer::PeerSelf);
5841 }
5842 match cache.users.get(&u.user_id) {
5843 Some(&hash) => Ok(tl::enums::InputPeer::User(tl::types::InputPeerUser {
5844 user_id: u.user_id,
5845 access_hash: hash,
5846 })),
5847 None => Err(InvocationError::Deserialize(format!(
5848 "access_hash unknown for user {}; resolve via username first",
5849 u.user_id
5850 ))),
5851 }
5852 }
5853 tl::enums::Peer::Chat(c) => Ok(tl::enums::InputPeer::Chat(tl::types::InputPeerChat {
5854 chat_id: c.chat_id,
5855 })),
5856 tl::enums::Peer::Channel(c) => match cache.channels.get(&c.channel_id) {
5857 Some(&hash) => Ok(tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
5858 channel_id: c.channel_id,
5859 access_hash: hash,
5860 })),
5861 None => Err(InvocationError::Deserialize(format!(
5862 "access_hash unknown for channel {}; resolve via username first",
5863 c.channel_id
5864 ))),
5865 },
5866 }
5867 }
5868
5869 pub async fn invoke_on_dc<R: RemoteCall>(
5877 &self,
5878 dc_id: i32,
5879 req: &R,
5880 ) -> Result<R::Return, InvocationError> {
5881 let body = self.rpc_on_dc_raw(dc_id, req).await?;
5882 let mut cur = Cursor::from_slice(&body);
5883 R::Return::deserialize(&mut cur).map_err(Into::into)
5884 }
5885
5886 async fn rpc_on_dc_raw<R: RemoteCall>(
5888 &self,
5889 dc_id: i32,
5890 req: &R,
5891 ) -> Result<Vec<u8>, InvocationError> {
5892 let needs_new = {
5894 let pool = self.inner.dc_pool.lock().await;
5895 !pool.has_connection(dc_id)
5896 };
5897
5898 if needs_new {
5899 let addr = {
5900 let opts = self.inner.dc_options.lock().await;
5901 opts.get(&dc_id)
5902 .map(|e| e.addr.clone())
5903 .unwrap_or_else(|| crate::dc_migration::fallback_dc_addr(dc_id).to_string())
5904 };
5905
5906 let socks5 = self.inner.socks5.clone();
5907 let transport = self.inner.transport.clone();
5908 let saved_key = {
5909 let opts = self.inner.dc_options.lock().await;
5910 opts.get(&dc_id).and_then(|e| e.auth_key)
5911 };
5912
5913 let dc_conn = if let Some(key) = saved_key {
5914 dc_pool::DcConnection::connect_with_key(
5915 &addr,
5916 key,
5917 0,
5918 0,
5919 socks5.as_ref(),
5920 &transport,
5921 dc_id as i16,
5922 )
5923 .await?
5924 } else {
5925 let conn = dc_pool::DcConnection::connect_raw(
5926 &addr,
5927 socks5.as_ref(),
5928 &transport,
5929 dc_id as i16,
5930 )
5931 .await?;
5932 let home_dc_id = *self.inner.home_dc_id.lock().await;
5934 if dc_id != home_dc_id
5935 && let Err(e) = self.export_import_auth(dc_id, &conn).await
5936 {
5937 tracing::warn!("[layer] Auth export/import for DC{dc_id} failed: {e}");
5938 }
5939 conn
5940 };
5941
5942 let key = dc_conn.auth_key_bytes();
5943 {
5944 let mut opts = self.inner.dc_options.lock().await;
5945 if let Some(e) = opts.get_mut(&dc_id) {
5946 e.auth_key = Some(key);
5947 }
5948 }
5949 self.inner.dc_pool.lock().await.insert(dc_id, dc_conn);
5950 }
5951
5952 let dc_entries: Vec<DcEntry> = self
5953 .inner
5954 .dc_options
5955 .lock()
5956 .await
5957 .values()
5958 .cloned()
5959 .collect();
5960 self.inner
5961 .dc_pool
5962 .lock()
5963 .await
5964 .invoke_on_dc(dc_id, &dc_entries, req)
5965 .await
5966 }
5967
5968 async fn export_import_auth(
5970 &self,
5971 dc_id: i32,
5972 _dc_conn: &dc_pool::DcConnection, ) -> Result<(), InvocationError> {
5974 let export_req = tl::functions::auth::ExportAuthorization { dc_id };
5976 let body = self.rpc_call_raw(&export_req).await?;
5977 let mut cur = Cursor::from_slice(&body);
5978 let tl::enums::auth::ExportedAuthorization::ExportedAuthorization(exported) =
5979 tl::enums::auth::ExportedAuthorization::deserialize(&mut cur)?;
5980
5981 let import_req = tl::functions::auth::ImportAuthorization {
5983 id: exported.id,
5984 bytes: exported.bytes,
5985 };
5986 let dc_entries: Vec<DcEntry> = self
5987 .inner
5988 .dc_options
5989 .lock()
5990 .await
5991 .values()
5992 .cloned()
5993 .collect();
5994 self.inner
5995 .dc_pool
5996 .lock()
5997 .await
5998 .invoke_on_dc(dc_id, &dc_entries, &import_req)
5999 .await?;
6000 tracing::debug!("[layer] Auth exported+imported to DC{dc_id} ✓");
6001 Ok(())
6002 }
6003
6004 async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
6007 let body = self
6008 .rpc_call_raw(&tl::functions::account::GetPassword {})
6009 .await?;
6010 let mut cur = Cursor::from_slice(&body);
6011 let tl::enums::account::Password::Password(pw) =
6012 tl::enums::account::Password::deserialize(&mut cur)?;
6013 Ok(PasswordToken { password: pw })
6014 }
6015
6016 fn make_send_code_req(&self, phone: &str) -> tl::functions::auth::SendCode {
6017 tl::functions::auth::SendCode {
6018 phone_number: phone.to_string(),
6019 api_id: self.inner.api_id,
6020 api_hash: self.inner.api_hash.clone(),
6021 settings: tl::enums::CodeSettings::CodeSettings(tl::types::CodeSettings {
6022 allow_flashcall: false,
6023 current_number: false,
6024 allow_app_hash: false,
6025 allow_missed_call: false,
6026 allow_firebase: false,
6027 unknown_number: false,
6028 logout_tokens: None,
6029 token: None,
6030 app_sandbox: None,
6031 }),
6032 }
6033 }
6034
6035 fn extract_user_name(user: &tl::enums::User) -> String {
6036 match user {
6037 tl::enums::User::User(u) => format!(
6038 "{} {}",
6039 u.first_name.as_deref().unwrap_or(""),
6040 u.last_name.as_deref().unwrap_or("")
6041 )
6042 .trim()
6043 .to_string(),
6044 tl::enums::User::Empty(_) => "(unknown)".into(),
6045 }
6046 }
6047
6048 #[allow(clippy::type_complexity)]
6049 fn extract_password_params(
6050 algo: &tl::enums::PasswordKdfAlgo,
6051 ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
6052 match algo {
6053 tl::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
6054 Ok((&a.salt1, &a.salt2, &a.p, a.g))
6055 }
6056 _ => Err(InvocationError::Deserialize(
6057 "unsupported password KDF algo".into(),
6058 )),
6059 }
6060 }
6061}
6062
6063pub(crate) fn attach_client_to_update(u: update::Update, client: &Client) -> update::Update {
6066 match u {
6067 update::Update::NewMessage(msg) => {
6068 update::Update::NewMessage(msg.with_client(client.clone()))
6069 }
6070 update::Update::MessageEdited(msg) => {
6071 update::Update::MessageEdited(msg.with_client(client.clone()))
6072 }
6073 other => other,
6074 }
6075}
6076
6077pub struct DialogIter {
6081 offset_date: i32,
6082 offset_id: i32,
6083 offset_peer: tl::enums::InputPeer,
6084 done: bool,
6085 buffer: VecDeque<Dialog>,
6086 pub total: Option<i32>,
6089}
6090
6091impl DialogIter {
6092 const PAGE_SIZE: i32 = 100;
6093
6094 pub fn total(&self) -> Option<i32> {
6100 self.total
6101 }
6102
6103 pub async fn next(&mut self, client: &Client) -> Result<Option<Dialog>, InvocationError> {
6105 if let Some(d) = self.buffer.pop_front() {
6106 return Ok(Some(d));
6107 }
6108 if self.done {
6109 return Ok(None);
6110 }
6111
6112 let req = tl::functions::messages::GetDialogs {
6113 exclude_pinned: false,
6114 folder_id: None,
6115 offset_date: self.offset_date,
6116 offset_id: self.offset_id,
6117 offset_peer: self.offset_peer.clone(),
6118 limit: Self::PAGE_SIZE,
6119 hash: 0,
6120 };
6121
6122 let (dialogs, count) = client.get_dialogs_raw_with_count(req).await?;
6123 if self.total.is_none() {
6125 self.total = count;
6126 }
6127 if dialogs.is_empty() || dialogs.len() < Self::PAGE_SIZE as usize {
6128 self.done = true;
6129 }
6130
6131 if let Some(last) = dialogs.last() {
6133 self.offset_date = last
6134 .message
6135 .as_ref()
6136 .map(|m| match m {
6137 tl::enums::Message::Message(x) => x.date,
6138 tl::enums::Message::Service(x) => x.date,
6139 _ => 0,
6140 })
6141 .unwrap_or(0);
6142 self.offset_id = last.top_message();
6143 if let Some(peer) = last.peer() {
6144 self.offset_peer = client.inner.peer_cache.read().await.peer_to_input(peer);
6145 }
6146 }
6147
6148 self.buffer.extend(dialogs);
6149 Ok(self.buffer.pop_front())
6150 }
6151}
6152
6153pub struct MessageIter {
6155 unresolved: Option<PeerRef>,
6156 peer: Option<tl::enums::Peer>,
6157 offset_id: i32,
6158 done: bool,
6159 buffer: VecDeque<update::IncomingMessage>,
6160 pub total: Option<i32>,
6164}
6165
6166impl MessageIter {
6167 const PAGE_SIZE: i32 = 100;
6168
6169 pub fn total(&self) -> Option<i32> {
6174 self.total
6175 }
6176
6177 pub async fn next(
6179 &mut self,
6180 client: &Client,
6181 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
6182 if let Some(m) = self.buffer.pop_front() {
6183 return Ok(Some(m));
6184 }
6185 if self.done {
6186 return Ok(None);
6187 }
6188
6189 let peer = if let Some(p) = &self.peer {
6191 p.clone()
6192 } else {
6193 let pr = self.unresolved.take().expect("MessageIter: peer not set");
6194 let p = pr.resolve(client).await?;
6195 self.peer = Some(p.clone());
6196 p
6197 };
6198
6199 let input_peer = client.inner.peer_cache.read().await.peer_to_input(&peer);
6200 let (page, count) = client
6201 .get_messages_with_count(input_peer, Self::PAGE_SIZE, self.offset_id)
6202 .await?;
6203
6204 if self.total.is_none() {
6205 self.total = count;
6206 }
6207
6208 if page.is_empty() || page.len() < Self::PAGE_SIZE as usize {
6209 self.done = true;
6210 }
6211 if let Some(last) = page.last() {
6212 self.offset_id = last.id();
6213 }
6214
6215 self.buffer.extend(page);
6216 Ok(self.buffer.pop_front())
6217 }
6218}
6219
6220#[doc(hidden)]
6224pub fn random_i64_pub() -> i64 {
6225 random_i64()
6226}
6227
6228pub fn is_bool_true(body: &[u8]) -> bool {
6229 body.len() == 4 && u32::from_le_bytes(body[0..4].try_into().unwrap_or([0u8; 4])) == 0x997275b5
6230}
6231
6232pub fn is_bool_false(body: &[u8]) -> bool {
6233 body.len() == 4 && u32::from_le_bytes(body[0..4].try_into().unwrap_or([0u8; 4])) == 0xbc799737
6234}
6235
6236#[derive(Clone)]
6246enum FrameKind {
6247 Abridged,
6248 Intermediate,
6249 #[allow(dead_code)]
6250 Full {
6251 send_seqno: Arc<std::sync::atomic::AtomicU32>,
6252 recv_seqno: Arc<std::sync::atomic::AtomicU32>,
6253 },
6254 Obfuscated {
6256 cipher: std::sync::Arc<tokio::sync::Mutex<layer_crypto::ObfuscatedCipher>>,
6257 },
6258 PaddedIntermediate {
6260 cipher: std::sync::Arc<tokio::sync::Mutex<layer_crypto::ObfuscatedCipher>>,
6261 },
6262 FakeTls {
6264 cipher: std::sync::Arc<tokio::sync::Mutex<layer_crypto::ObfuscatedCipher>>,
6265 },
6266}
6267
6268#[derive(Clone, Debug)]
6274struct FutureSalt {
6275 valid_since: i32,
6276 valid_until: i32,
6277 salt: i64,
6278}
6279
6280const SALT_USE_DELAY: i32 = 60;
6283
6284struct ConnectionWriter {
6286 enc: EncryptedSession,
6287 frame_kind: FrameKind,
6288 pending_ack: Vec<i64>,
6292 sent_bodies: std::collections::HashMap<i64, Vec<u8>>,
6296 container_map: std::collections::HashMap<i64, i64>,
6302 salts: Vec<FutureSalt>,
6307 start_salt_time: Option<(i32, std::time::Instant)>,
6312}
6313
6314impl ConnectionWriter {
6315 fn auth_key_bytes(&self) -> [u8; 256] {
6316 self.enc.auth_key_bytes()
6317 }
6318 fn first_salt(&self) -> i64 {
6319 self.enc.salt
6320 }
6321 fn time_offset(&self) -> i32 {
6322 self.enc.time_offset
6323 }
6324
6325 fn advance_salt_if_needed(&mut self) -> bool {
6339 let Some((server_now, start_instant)) = self.start_salt_time else {
6340 return self.salts.len() <= 1;
6341 };
6342
6343 let now = server_now + start_instant.elapsed().as_secs() as i32;
6345
6346 while self.salts.len() > 1 && now > self.salts[0].valid_until {
6348 let expired = self.salts.remove(0);
6349 tracing::debug!(
6350 "[layer] salt {:#x} expired (valid_until={}), pruned",
6351 expired.salt,
6352 expired.valid_until,
6353 );
6354 }
6355
6356 if self.salts.len() > 1 {
6359 let best = self
6360 .salts
6361 .iter()
6362 .rev()
6363 .find(|s| s.valid_since + SALT_USE_DELAY <= now)
6364 .map(|s| s.salt);
6365 if let Some(salt) = best {
6366 if salt != self.enc.salt {
6367 tracing::debug!(
6368 "[layer] proactive salt cycle: {:#x} → {:#x}",
6369 self.enc.salt,
6370 salt
6371 );
6372 self.enc.salt = salt;
6373 self.salts.retain(|s| s.valid_since >= now - SALT_USE_DELAY);
6375 if self.salts.is_empty() {
6376 self.salts.push(FutureSalt {
6378 valid_since: 0,
6379 valid_until: i32::MAX,
6380 salt,
6381 });
6382 }
6383 }
6384 }
6385 }
6386
6387 self.salts.len() <= 1
6388 }
6389}
6390
6391struct Connection {
6392 stream: TcpStream,
6393 enc: EncryptedSession,
6394 frame_kind: FrameKind,
6395}
6396
6397impl Connection {
6398 async fn open_stream(
6400 addr: &str,
6401 socks5: Option<&crate::socks5::Socks5Config>,
6402 transport: &TransportKind,
6403 dc_id: i16,
6404 ) -> Result<(TcpStream, FrameKind), InvocationError> {
6405 let stream = match socks5 {
6406 Some(proxy) => proxy.connect(addr).await?,
6407 None => {
6408 let stream = TcpStream::connect(addr)
6409 .await
6410 .map_err(InvocationError::Io)?;
6411 stream.set_nodelay(true).ok();
6412 {
6413 let sock = socket2::SockRef::from(&stream);
6414 let keepalive = TcpKeepalive::new()
6415 .with_time(Duration::from_secs(TCP_KEEPALIVE_IDLE_SECS))
6416 .with_interval(Duration::from_secs(TCP_KEEPALIVE_INTERVAL_SECS));
6417 #[cfg(not(target_os = "windows"))]
6418 let keepalive = keepalive.with_retries(TCP_KEEPALIVE_PROBES);
6419 sock.set_tcp_keepalive(&keepalive).ok();
6420 }
6421 stream
6422 }
6423 };
6424 Self::apply_transport_init(stream, transport, dc_id).await
6425 }
6426
6427 async fn open_stream_mtproxy(
6430 mtproxy: &crate::proxy::MtProxyConfig,
6431 dc_id: i16,
6432 ) -> Result<(TcpStream, FrameKind), InvocationError> {
6433 let stream = mtproxy.connect().await?;
6434 stream.set_nodelay(true).ok();
6435 Self::apply_transport_init(stream, &mtproxy.transport, dc_id).await
6436 }
6437
6438 async fn apply_transport_init(
6439 mut stream: TcpStream,
6440 transport: &TransportKind,
6441 dc_id: i16,
6442 ) -> Result<(TcpStream, FrameKind), InvocationError> {
6443 match transport {
6444 TransportKind::Abridged => {
6445 stream.write_all(&[0xef]).await?;
6446 Ok((stream, FrameKind::Abridged))
6447 }
6448 TransportKind::Intermediate => {
6449 stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
6450 Ok((stream, FrameKind::Intermediate))
6451 }
6452 TransportKind::Full => {
6453 Ok((
6455 stream,
6456 FrameKind::Full {
6457 send_seqno: Arc::new(std::sync::atomic::AtomicU32::new(0)),
6458 recv_seqno: Arc::new(std::sync::atomic::AtomicU32::new(0)),
6459 },
6460 ))
6461 }
6462 TransportKind::Obfuscated { secret } => {
6463 use sha2::Digest;
6464
6465 let mut nonce = [0u8; 64];
6470 loop {
6471 getrandom::getrandom(&mut nonce)
6472 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
6473 let first = u32::from_le_bytes(nonce[0..4].try_into().unwrap());
6474 let second = u32::from_le_bytes(nonce[4..8].try_into().unwrap());
6475 let bad = nonce[0] == 0xEF
6476 || first == 0x44414548 || first == 0x54534F50 || first == 0x20544547 || first == 0xEEEEEEEE
6480 || first == 0xDDDDDDDD
6481 || first == 0x02010316
6482 || second == 0x00000000;
6483 if !bad {
6484 break;
6485 }
6486 }
6487
6488 let tx_raw: [u8; 32] = nonce[8..40].try_into().unwrap();
6494 let tx_iv: [u8; 16] = nonce[40..56].try_into().unwrap();
6495 let mut rev48 = nonce[8..56].to_vec();
6496 rev48.reverse();
6497 let rx_raw: [u8; 32] = rev48[0..32].try_into().unwrap();
6498 let rx_iv: [u8; 16] = rev48[32..48].try_into().unwrap();
6499
6500 let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
6501 let mut h = sha2::Sha256::new();
6502 h.update(tx_raw);
6503 h.update(s.as_ref());
6504 let tx: [u8; 32] = h.finalize().into();
6505
6506 let mut h = sha2::Sha256::new();
6507 h.update(rx_raw);
6508 h.update(s.as_ref());
6509 let rx: [u8; 32] = h.finalize().into();
6510 (tx, rx)
6511 } else {
6512 (tx_raw, rx_raw)
6513 };
6514
6515 nonce[56] = 0xef;
6518 nonce[57] = 0xef;
6519 nonce[58] = 0xef;
6520 nonce[59] = 0xef;
6521 let dc_bytes = dc_id.to_le_bytes();
6522 nonce[60] = dc_bytes[0];
6523 nonce[61] = dc_bytes[1];
6524
6525 let mut cipher =
6535 layer_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
6536 let mut skip = [0u8; 56];
6538 cipher.encrypt(&mut skip);
6539 cipher.encrypt(&mut nonce[56..64]);
6541
6542 stream.write_all(&nonce).await?;
6543
6544 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
6545 Ok((stream, FrameKind::Obfuscated { cipher: cipher_arc }))
6546 }
6547 TransportKind::PaddedIntermediate { secret } => {
6548 use sha2::Digest;
6549 let mut nonce = [0u8; 64];
6550 loop {
6551 getrandom::getrandom(&mut nonce)
6552 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
6553 let first = u32::from_le_bytes(nonce[0..4].try_into().unwrap());
6554 let second = u32::from_le_bytes(nonce[4..8].try_into().unwrap());
6555 let bad = nonce[0] == 0xEF
6556 || first == 0x44414548
6557 || first == 0x54534F50
6558 || first == 0x20544547
6559 || first == 0xEEEEEEEE
6560 || first == 0xDDDDDDDD
6561 || first == 0x02010316
6562 || second == 0x00000000;
6563 if !bad {
6564 break;
6565 }
6566 }
6567 let tx_raw: [u8; 32] = nonce[8..40].try_into().unwrap();
6568 let tx_iv: [u8; 16] = nonce[40..56].try_into().unwrap();
6569 let mut rev48 = nonce[8..56].to_vec();
6570 rev48.reverse();
6571 let rx_raw: [u8; 32] = rev48[0..32].try_into().unwrap();
6572 let rx_iv: [u8; 16] = rev48[32..48].try_into().unwrap();
6573 let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
6574 let mut h = sha2::Sha256::new();
6575 h.update(tx_raw);
6576 h.update(s.as_ref());
6577 let tx: [u8; 32] = h.finalize().into();
6578 let mut h = sha2::Sha256::new();
6579 h.update(rx_raw);
6580 h.update(s.as_ref());
6581 let rx: [u8; 32] = h.finalize().into();
6582 (tx, rx)
6583 } else {
6584 (tx_raw, rx_raw)
6585 };
6586 nonce[56] = 0xdd;
6588 nonce[57] = 0xdd;
6589 nonce[58] = 0xdd;
6590 nonce[59] = 0xdd;
6591 let dc_bytes = dc_id.to_le_bytes();
6592 nonce[60] = dc_bytes[0];
6593 nonce[61] = dc_bytes[1];
6594 let mut cipher =
6595 layer_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
6596 let mut skip = [0u8; 56];
6597 cipher.encrypt(&mut skip);
6598 cipher.encrypt(&mut nonce[56..64]);
6599 stream.write_all(&nonce).await?;
6600 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
6601 Ok((stream, FrameKind::PaddedIntermediate { cipher: cipher_arc }))
6602 }
6603 TransportKind::FakeTls { secret, domain } => {
6604 let domain_bytes = domain.as_bytes();
6608 let mut session_id = [0u8; 32];
6609 getrandom::getrandom(&mut session_id)
6610 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
6611
6612 let cipher_suites: &[u8] = &[0x00, 0x04, 0x13, 0x01, 0x13, 0x02];
6614 let compression: &[u8] = &[0x01, 0x00];
6615 let sni_name_len = domain_bytes.len() as u16;
6616 let sni_list_len = sni_name_len + 3;
6617 let sni_ext_len = sni_list_len + 2;
6618 let mut sni_ext = Vec::new();
6619 sni_ext.extend_from_slice(&[0x00, 0x00]);
6620 sni_ext.extend_from_slice(&sni_ext_len.to_be_bytes());
6621 sni_ext.extend_from_slice(&sni_list_len.to_be_bytes());
6622 sni_ext.push(0x00);
6623 sni_ext.extend_from_slice(&sni_name_len.to_be_bytes());
6624 sni_ext.extend_from_slice(domain_bytes);
6625 let sup_ver: &[u8] = &[0x00, 0x2b, 0x00, 0x03, 0x02, 0x03, 0x04];
6626 let sup_grp: &[u8] = &[0x00, 0x0a, 0x00, 0x04, 0x00, 0x02, 0x00, 0x1d];
6627 let sess_tick: &[u8] = &[0x00, 0x23, 0x00, 0x00];
6628 let ext_body_len = sni_ext.len() + sup_ver.len() + sup_grp.len() + sess_tick.len();
6629 let mut extensions = Vec::new();
6630 extensions.extend_from_slice(&(ext_body_len as u16).to_be_bytes());
6631 extensions.extend_from_slice(&sni_ext);
6632 extensions.extend_from_slice(sup_ver);
6633 extensions.extend_from_slice(sup_grp);
6634 extensions.extend_from_slice(sess_tick);
6635
6636 let mut hello_body = Vec::new();
6637 hello_body.extend_from_slice(&[0x03, 0x03]);
6638 hello_body.extend_from_slice(&[0u8; 32]); hello_body.push(session_id.len() as u8);
6640 hello_body.extend_from_slice(&session_id);
6641 hello_body.extend_from_slice(cipher_suites);
6642 hello_body.extend_from_slice(compression);
6643 hello_body.extend_from_slice(&extensions);
6644
6645 let hs_len = hello_body.len() as u32;
6646 let mut handshake = Vec::new();
6647 handshake.push(0x01);
6648 handshake.push(((hs_len >> 16) & 0xff) as u8);
6649 handshake.push(((hs_len >> 8) & 0xff) as u8);
6650 handshake.push((hs_len & 0xff) as u8);
6651 handshake.extend_from_slice(&hello_body);
6652
6653 let rec_len = handshake.len() as u16;
6654 let mut record = Vec::new();
6655 record.push(0x16);
6656 record.extend_from_slice(&[0x03, 0x01]);
6657 record.extend_from_slice(&rec_len.to_be_bytes());
6658 record.extend_from_slice(&handshake);
6659
6660 use sha2::Digest;
6662 let random_offset = 5 + 4 + 2; let hmac_result: [u8; 32] = {
6664 use hmac::{Hmac, Mac};
6665 type HmacSha256 = Hmac<sha2::Sha256>;
6666 let mut mac = HmacSha256::new_from_slice(secret)
6667 .map_err(|_| InvocationError::Deserialize("HMAC key error".into()))?;
6668 mac.update(&record);
6669 mac.finalize().into_bytes().into()
6670 };
6671 record[random_offset..random_offset + 32].copy_from_slice(&hmac_result);
6672 stream.write_all(&record).await?;
6673
6674 let mut h = sha2::Sha256::new();
6676 h.update(secret.as_ref());
6677 h.update(&hmac_result);
6678 let derived: [u8; 32] = h.finalize().into();
6679 let iv = [0u8; 16];
6680 let cipher =
6681 layer_crypto::ObfuscatedCipher::from_keys(&derived, &iv, &derived, &iv);
6682 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
6683 Ok((stream, FrameKind::FakeTls { cipher: cipher_arc }))
6684 }
6685 }
6686 }
6687
6688 async fn connect_raw(
6689 addr: &str,
6690 socks5: Option<&crate::socks5::Socks5Config>,
6691 mtproxy: Option<&crate::proxy::MtProxyConfig>,
6692 transport: &TransportKind,
6693 dc_id: i16,
6694 ) -> Result<Self, InvocationError> {
6695 tracing::debug!("[layer] Connecting to {addr} (DH) …");
6696
6697 let addr2 = addr.to_string();
6698 let socks5_c = socks5.cloned();
6699 let mtproxy_c = mtproxy.cloned();
6700 let transport_c = transport.clone();
6701
6702 let fut = async move {
6703 let (mut stream, frame_kind) = if let Some(ref mp) = mtproxy_c {
6704 Self::open_stream_mtproxy(mp, dc_id).await?
6705 } else {
6706 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c, dc_id).await?
6707 };
6708
6709 let mut plain = Session::new();
6710
6711 let (req1, s1) =
6712 auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6713 send_frame(
6714 &mut stream,
6715 &plain.pack(&req1).to_plaintext_bytes(),
6716 &frame_kind,
6717 )
6718 .await?;
6719 let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
6720
6721 let (req2, s2) = auth::step2(s1, res_pq, dc_id as i32)
6722 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6723 send_frame(
6724 &mut stream,
6725 &plain.pack(&req2).to_plaintext_bytes(),
6726 &frame_kind,
6727 )
6728 .await?;
6729 let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
6730
6731 let (req3, s3) =
6732 auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6733 send_frame(
6734 &mut stream,
6735 &plain.pack(&req3).to_plaintext_bytes(),
6736 &frame_kind,
6737 )
6738 .await?;
6739 let ans: tl::enums::SetClientDhParamsAnswer =
6740 recv_frame_plain(&mut stream, &frame_kind).await?;
6741
6742 let done = {
6744 let mut result = auth::finish(s3, ans)
6745 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6746 let mut attempts = 0u8;
6747 loop {
6748 match result {
6749 auth::FinishResult::Done(d) => break d,
6750 auth::FinishResult::Retry {
6751 retry_id,
6752 dh_params,
6753 nonce,
6754 server_nonce,
6755 new_nonce,
6756 } => {
6757 attempts += 1;
6758 if attempts >= 5 {
6759 return Err(InvocationError::Deserialize(
6760 "dh_gen_retry exceeded 5 attempts".into(),
6761 ));
6762 }
6763 let (req_retry, s3_retry) = auth::retry_step3(
6764 &dh_params,
6765 nonce,
6766 server_nonce,
6767 new_nonce,
6768 retry_id,
6769 )
6770 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6771 send_frame(
6772 &mut stream,
6773 &plain.pack(&req_retry).to_plaintext_bytes(),
6774 &frame_kind,
6775 )
6776 .await?;
6777 let ans_retry: tl::enums::SetClientDhParamsAnswer =
6778 recv_frame_plain(&mut stream, &frame_kind).await?;
6779 result = auth::finish(s3_retry, ans_retry)
6780 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6781 }
6782 }
6783 }
6784 };
6785 tracing::debug!("[layer] DH complete ✓");
6786
6787 Ok::<Self, InvocationError>(Self {
6788 stream,
6789 enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
6790 frame_kind,
6791 })
6792 };
6793
6794 tokio::time::timeout(Duration::from_secs(15), fut)
6795 .await
6796 .map_err(|_| {
6797 InvocationError::Deserialize(format!(
6798 "DH handshake with {addr} timed out after 15 s"
6799 ))
6800 })?
6801 }
6802
6803 async fn connect_with_key(
6804 addr: &str,
6805 auth_key: [u8; 256],
6806 first_salt: i64,
6807 time_offset: i32,
6808 socks5: Option<&crate::socks5::Socks5Config>,
6809 transport: &TransportKind,
6810 dc_id: i16,
6811 ) -> Result<Self, InvocationError> {
6812 let addr2 = addr.to_string();
6813 let socks5_c = socks5.cloned();
6814 let transport_c = transport.clone();
6815
6816 let fut = async move {
6817 let (stream, frame_kind) =
6818 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c, dc_id).await?;
6819 Ok::<Self, InvocationError>(Self {
6820 stream,
6821 enc: EncryptedSession::new(auth_key, first_salt, time_offset),
6822 frame_kind,
6823 })
6824 };
6825
6826 tokio::time::timeout(Duration::from_secs(15), fut)
6827 .await
6828 .map_err(|_| {
6829 InvocationError::Deserialize(format!(
6830 "connect_with_key to {addr} timed out after 15 s"
6831 ))
6832 })?
6833 }
6834
6835 fn auth_key_bytes(&self) -> [u8; 256] {
6836 self.enc.auth_key_bytes()
6837 }
6838
6839 fn into_writer(self) -> (ConnectionWriter, OwnedWriteHalf, OwnedReadHalf, FrameKind) {
6841 let (read_half, write_half) = self.stream.into_split();
6842 let writer = ConnectionWriter {
6843 enc: self.enc,
6844 frame_kind: self.frame_kind.clone(),
6845 pending_ack: Vec::new(),
6846 sent_bodies: std::collections::HashMap::new(),
6847 container_map: std::collections::HashMap::new(),
6848 salts: Vec::new(),
6849 start_salt_time: None,
6850 };
6851 (writer, write_half, read_half, self.frame_kind)
6852 }
6853}
6854
6855async fn send_frame(
6859 stream: &mut TcpStream,
6860 data: &[u8],
6861 kind: &FrameKind,
6862) -> Result<(), InvocationError> {
6863 match kind {
6864 FrameKind::Abridged => send_abridged(stream, data).await,
6865 FrameKind::Intermediate => {
6866 let mut frame = Vec::with_capacity(4 + data.len());
6867 frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
6868 frame.extend_from_slice(data);
6869 stream.write_all(&frame).await?;
6870 Ok(())
6871 }
6872 FrameKind::Full { send_seqno, .. } => {
6873 let seq = send_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
6876 let total_len = (data.len() as u32) + 12;
6877 let mut packet = Vec::with_capacity(total_len as usize);
6878 packet.extend_from_slice(&total_len.to_le_bytes());
6879 packet.extend_from_slice(&seq.to_le_bytes());
6880 packet.extend_from_slice(data);
6881 let crc = crate::transport_intermediate::crc32_ieee(&packet);
6882 packet.extend_from_slice(&crc.to_le_bytes());
6883 stream.write_all(&packet).await?;
6884 Ok(())
6885 }
6886 FrameKind::Obfuscated { cipher } => {
6887 let words = data.len() / 4;
6889 let mut frame = if words < 0x7f {
6890 let mut v = Vec::with_capacity(1 + data.len());
6891 v.push(words as u8);
6892 v
6893 } else {
6894 let mut v = Vec::with_capacity(4 + data.len());
6895 v.extend_from_slice(&[
6896 0x7f,
6897 (words & 0xff) as u8,
6898 ((words >> 8) & 0xff) as u8,
6899 ((words >> 16) & 0xff) as u8,
6900 ]);
6901 v
6902 };
6903 frame.extend_from_slice(data);
6904 cipher.lock().await.encrypt(&mut frame);
6905 stream.write_all(&frame).await?;
6906 Ok(())
6907 }
6908 FrameKind::PaddedIntermediate { cipher } => {
6909 let mut pad_len_buf = [0u8; 1];
6911 getrandom::getrandom(&mut pad_len_buf).ok();
6912 let pad_len = (pad_len_buf[0] & 0x0f) as usize;
6913 let total_payload = data.len() + pad_len;
6914 let mut frame = Vec::with_capacity(4 + total_payload);
6915 frame.extend_from_slice(&(total_payload as u32).to_le_bytes());
6916 frame.extend_from_slice(data);
6917 let mut pad = vec![0u8; pad_len];
6918 getrandom::getrandom(&mut pad).ok();
6919 frame.extend_from_slice(&pad);
6920 cipher.lock().await.encrypt(&mut frame);
6921 stream.write_all(&frame).await?;
6922 Ok(())
6923 }
6924 FrameKind::FakeTls { cipher } => {
6925 const TLS_APP_DATA: u8 = 0x17;
6929 const TLS_VER: [u8; 2] = [0x03, 0x03];
6930 const CHUNK: usize = 2878;
6932 let mut locked = cipher.lock().await;
6933 for chunk in data.chunks(CHUNK) {
6934 let chunk_len = chunk.len() as u16;
6935 let mut record = Vec::with_capacity(5 + chunk.len());
6936 record.push(TLS_APP_DATA);
6937 record.extend_from_slice(&TLS_VER);
6938 record.extend_from_slice(&chunk_len.to_be_bytes());
6939 record.extend_from_slice(chunk);
6940 locked.encrypt(&mut record[5..]);
6942 stream.write_all(&record).await?;
6943 }
6944 Ok(())
6945 }
6946 }
6947}
6948
6949enum FrameOutcome {
6953 Frame(Vec<u8>),
6954 Error(InvocationError),
6955 Keepalive, }
6957
6958async fn recv_frame_with_keepalive(
6965 rh: &mut OwnedReadHalf,
6966 fk: &FrameKind,
6967 client: &Client,
6968 _ak: &[u8; 256],
6969) -> FrameOutcome {
6970 match tokio::time::timeout(
6971 Duration::from_secs(PING_DELAY_SECS),
6972 recv_frame_read(rh, fk),
6973 )
6974 .await
6975 {
6976 Ok(Ok(raw)) => FrameOutcome::Frame(raw),
6977 Ok(Err(e)) => FrameOutcome::Error(e),
6978 Err(_) => {
6979 let ping_req = tl::functions::PingDelayDisconnect {
6983 ping_id: random_i64(),
6984 disconnect_delay: NO_PING_DISCONNECT,
6985 };
6986 let (wire, fk) = {
6987 let mut w = client.inner.writer.lock().await;
6988 let fk = w.frame_kind.clone();
6989 (w.enc.pack(&ping_req), fk)
6990 };
6991 match send_frame_write(&mut *client.inner.write_half.lock().await, &wire, &fk).await {
6992 Ok(()) => FrameOutcome::Keepalive,
6993 Err(e) => FrameOutcome::Error(e),
6994 }
6995 }
6996 }
6997}
6998
6999async fn send_frame_write(
7006 stream: &mut OwnedWriteHalf,
7007 data: &[u8],
7008 kind: &FrameKind,
7009) -> Result<(), InvocationError> {
7010 match kind {
7011 FrameKind::Abridged => {
7012 let words = data.len() / 4;
7013 let mut frame = if words < 0x7f {
7015 let mut v = Vec::with_capacity(1 + data.len());
7016 v.push(words as u8);
7017 v
7018 } else {
7019 let mut v = Vec::with_capacity(4 + data.len());
7020 v.extend_from_slice(&[
7021 0x7f,
7022 (words & 0xff) as u8,
7023 ((words >> 8) & 0xff) as u8,
7024 ((words >> 16) & 0xff) as u8,
7025 ]);
7026 v
7027 };
7028 frame.extend_from_slice(data);
7029 stream.write_all(&frame).await?;
7030 Ok(())
7031 }
7032 FrameKind::Intermediate => {
7033 let mut frame = Vec::with_capacity(4 + data.len());
7034 frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
7035 frame.extend_from_slice(data);
7036 stream.write_all(&frame).await?;
7037 Ok(())
7038 }
7039 FrameKind::Full { send_seqno, .. } => {
7040 let seq = send_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
7042 let total_len = (data.len() as u32) + 12;
7043 let mut packet = Vec::with_capacity(total_len as usize);
7044 packet.extend_from_slice(&total_len.to_le_bytes());
7045 packet.extend_from_slice(&seq.to_le_bytes());
7046 packet.extend_from_slice(data);
7047 let crc = crate::transport_intermediate::crc32_ieee(&packet);
7048 packet.extend_from_slice(&crc.to_le_bytes());
7049 stream.write_all(&packet).await?;
7050 Ok(())
7051 }
7052 FrameKind::Obfuscated { cipher } => {
7053 let words = data.len() / 4;
7055 let mut frame = if words < 0x7f {
7056 let mut v = Vec::with_capacity(1 + data.len());
7057 v.push(words as u8);
7058 v
7059 } else {
7060 let mut v = Vec::with_capacity(4 + data.len());
7061 v.extend_from_slice(&[
7062 0x7f,
7063 (words & 0xff) as u8,
7064 ((words >> 8) & 0xff) as u8,
7065 ((words >> 16) & 0xff) as u8,
7066 ]);
7067 v
7068 };
7069 frame.extend_from_slice(data);
7070 cipher.lock().await.encrypt(&mut frame);
7071 stream.write_all(&frame).await?;
7072 Ok(())
7073 }
7074 FrameKind::PaddedIntermediate { cipher } => {
7075 let mut pad_len_buf = [0u8; 1];
7076 getrandom::getrandom(&mut pad_len_buf).ok();
7077 let pad_len = (pad_len_buf[0] & 0x0f) as usize;
7078 let total_payload = data.len() + pad_len;
7079 let mut frame = Vec::with_capacity(4 + total_payload);
7080 frame.extend_from_slice(&(total_payload as u32).to_le_bytes());
7081 frame.extend_from_slice(data);
7082 let mut pad = vec![0u8; pad_len];
7083 getrandom::getrandom(&mut pad).ok();
7084 frame.extend_from_slice(&pad);
7085 cipher.lock().await.encrypt(&mut frame);
7086 stream.write_all(&frame).await?;
7087 Ok(())
7088 }
7089 FrameKind::FakeTls { cipher } => {
7090 const TLS_APP_DATA: u8 = 0x17;
7091 const TLS_VER: [u8; 2] = [0x03, 0x03];
7092 const CHUNK: usize = 2878;
7093 let mut locked = cipher.lock().await;
7094 for chunk in data.chunks(CHUNK) {
7095 let chunk_len = chunk.len() as u16;
7096 let mut record = Vec::with_capacity(5 + chunk.len());
7097 record.push(TLS_APP_DATA);
7098 record.extend_from_slice(&TLS_VER);
7099 record.extend_from_slice(&chunk_len.to_be_bytes());
7100 record.extend_from_slice(chunk);
7101 locked.encrypt(&mut record[5..]);
7102 stream.write_all(&record).await?;
7103 }
7104 Ok(())
7105 }
7106 }
7107}
7108
7109async fn recv_frame_read(
7111 stream: &mut OwnedReadHalf,
7112 kind: &FrameKind,
7113) -> Result<Vec<u8>, InvocationError> {
7114 match kind {
7115 FrameKind::Abridged => {
7116 let mut h = [0u8; 1];
7117 stream.read_exact(&mut h).await?;
7118 let words = if h[0] < 0x7f {
7119 h[0] as usize
7120 } else {
7121 let mut b = [0u8; 3];
7122 stream.read_exact(&mut b).await?;
7123 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
7124 };
7125 let len = words * 4;
7126 let mut buf = vec![0u8; len];
7127 stream.read_exact(&mut buf).await?;
7128 if buf.len() == 4 {
7129 let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
7130 if code < 0 {
7131 return Err(InvocationError::Rpc(RpcError::from_telegram(
7132 code,
7133 "transport error",
7134 )));
7135 }
7136 }
7137 Ok(buf)
7138 }
7139 FrameKind::Intermediate => {
7140 let mut len_buf = [0u8; 4];
7141 stream.read_exact(&mut len_buf).await?;
7142 let len_i32 = i32::from_le_bytes(len_buf);
7143 if len_i32 < 0 {
7144 return Err(InvocationError::Rpc(RpcError::from_telegram(
7145 len_i32,
7146 "transport error",
7147 )));
7148 }
7149 if len_i32 <= 4 {
7150 let mut code_buf = [0u8; 4];
7151 stream.read_exact(&mut code_buf).await?;
7152 let code = i32::from_le_bytes(code_buf);
7153 return Err(InvocationError::Rpc(RpcError::from_telegram(
7154 code,
7155 "transport error",
7156 )));
7157 }
7158 let len = len_i32 as usize;
7159 let mut buf = vec![0u8; len];
7160 stream.read_exact(&mut buf).await?;
7161 Ok(buf)
7162 }
7163 FrameKind::Full { recv_seqno, .. } => {
7164 let mut len_buf = [0u8; 4];
7165 stream.read_exact(&mut len_buf).await?;
7166 let total_len_i32 = i32::from_le_bytes(len_buf);
7167 if total_len_i32 < 0 {
7168 return Err(InvocationError::Rpc(RpcError::from_telegram(
7169 total_len_i32,
7170 "transport error",
7171 )));
7172 }
7173 let total_len = total_len_i32 as usize;
7174 if total_len < 12 {
7175 return Err(InvocationError::Deserialize(
7176 "Full transport: packet too short".into(),
7177 ));
7178 }
7179 let mut rest = vec![0u8; total_len - 4];
7180 stream.read_exact(&mut rest).await?;
7181 let (body, crc_bytes) = rest.split_at(rest.len() - 4);
7182 let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
7183 let mut check_input = Vec::with_capacity(4 + body.len());
7184 check_input.extend_from_slice(&len_buf);
7185 check_input.extend_from_slice(body);
7186 let actual_crc = crate::transport_intermediate::crc32_ieee(&check_input);
7187 if actual_crc != expected_crc {
7188 return Err(InvocationError::Deserialize(format!(
7189 "Full transport: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
7190 )));
7191 }
7192 let recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
7193 let expected_seq = recv_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
7194 if recv_seq != expected_seq {
7195 return Err(InvocationError::Deserialize(format!(
7196 "Full transport: seqno mismatch (got {recv_seq}, expected {expected_seq})"
7197 )));
7198 }
7199 Ok(body[4..].to_vec())
7200 }
7201 FrameKind::Obfuscated { cipher } => {
7202 let mut h = [0u8; 1];
7203 stream.read_exact(&mut h).await?;
7204 cipher.lock().await.decrypt(&mut h);
7205 let words = if h[0] < 0x7f {
7206 h[0] as usize
7207 } else {
7208 let mut b = [0u8; 3];
7209 stream.read_exact(&mut b).await?;
7210 cipher.lock().await.decrypt(&mut b);
7211 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
7212 };
7213 let mut buf = vec![0u8; words * 4];
7214 stream.read_exact(&mut buf).await?;
7215 cipher.lock().await.decrypt(&mut buf);
7216 if buf.len() == 4 {
7217 let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
7218 if code < 0 {
7219 return Err(InvocationError::Rpc(RpcError::from_telegram(
7220 code,
7221 "transport error",
7222 )));
7223 }
7224 }
7225 Ok(buf)
7226 }
7227 FrameKind::PaddedIntermediate { cipher } => {
7228 let mut len_buf = [0u8; 4];
7230 stream.read_exact(&mut len_buf).await?;
7231 cipher.lock().await.decrypt(&mut len_buf);
7232 let total_len = i32::from_le_bytes(len_buf);
7233 if total_len < 0 {
7234 return Err(InvocationError::Rpc(RpcError::from_telegram(
7235 total_len,
7236 "transport error",
7237 )));
7238 }
7239 let mut buf = vec![0u8; total_len as usize];
7240 stream.read_exact(&mut buf).await?;
7241 cipher.lock().await.decrypt(&mut buf);
7242 Ok(buf)
7246 }
7247 FrameKind::FakeTls { cipher } => {
7248 let mut hdr = [0u8; 5];
7250 stream.read_exact(&mut hdr).await?;
7251 if hdr[0] != 0x17 {
7252 return Err(InvocationError::Deserialize(format!(
7253 "FakeTLS: unexpected record type 0x{:02x}",
7254 hdr[0]
7255 )));
7256 }
7257 let payload_len = u16::from_be_bytes([hdr[3], hdr[4]]) as usize;
7258 let mut buf = vec![0u8; payload_len];
7259 stream.read_exact(&mut buf).await?;
7260 cipher.lock().await.decrypt(&mut buf);
7261 Ok(buf)
7262 }
7263 }
7264}
7265
7266async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
7268 let words = data.len() / 4;
7269 let mut frame = if words < 0x7f {
7271 let mut v = Vec::with_capacity(1 + data.len());
7272 v.push(words as u8);
7273 v
7274 } else {
7275 let mut v = Vec::with_capacity(4 + data.len());
7276 v.extend_from_slice(&[
7277 0x7f,
7278 (words & 0xff) as u8,
7279 ((words >> 8) & 0xff) as u8,
7280 ((words >> 16) & 0xff) as u8,
7281 ]);
7282 v
7283 };
7284 frame.extend_from_slice(data);
7285 stream.write_all(&frame).await?;
7286 Ok(())
7287}
7288
7289async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
7290 let mut h = [0u8; 1];
7291 stream.read_exact(&mut h).await?;
7292 let words = if h[0] < 0x7f {
7293 h[0] as usize
7294 } else {
7295 let mut b = [0u8; 3];
7296 stream.read_exact(&mut b).await?;
7297 let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
7298 if w == 1 {
7300 let mut code_buf = [0u8; 4];
7301 stream.read_exact(&mut code_buf).await?;
7302 let code = i32::from_le_bytes(code_buf);
7303 return Err(InvocationError::Rpc(RpcError::from_telegram(
7304 code,
7305 "transport error",
7306 )));
7307 }
7308 w
7309 };
7310 if words == 0 || words > 0x8000 {
7313 return Err(InvocationError::Deserialize(format!(
7314 "abridged: implausible word count {words} (possible transport error or framing mismatch)"
7315 )));
7316 }
7317 let mut buf = vec![0u8; words * 4];
7318 stream.read_exact(&mut buf).await?;
7319 Ok(buf)
7320}
7321
7322async fn recv_frame_plain<T: Deserializable>(
7324 stream: &mut TcpStream,
7325 kind: &FrameKind,
7326) -> Result<T, InvocationError> {
7327 let raw = match kind {
7334 FrameKind::Abridged => recv_abridged(stream).await?,
7335 FrameKind::Intermediate => {
7336 let mut len_buf = [0u8; 4];
7337 stream.read_exact(&mut len_buf).await?;
7338 let len = u32::from_le_bytes(len_buf) as usize;
7339 if len == 0 || len > 1 << 24 {
7340 return Err(InvocationError::Deserialize(format!(
7341 "plaintext frame: implausible length {len}"
7342 )));
7343 }
7344 let mut buf = vec![0u8; len];
7345 stream.read_exact(&mut buf).await?;
7346 buf
7347 }
7348 FrameKind::Full { recv_seqno, .. } => {
7349 let mut len_buf = [0u8; 4];
7351 stream.read_exact(&mut len_buf).await?;
7352 let total_len = u32::from_le_bytes(len_buf) as usize;
7353 if total_len < 12 || total_len > (1 << 24) + 12 {
7354 return Err(InvocationError::Deserialize(format!(
7355 "Full plaintext frame: implausible total_len {total_len}"
7356 )));
7357 }
7358 let mut rest = vec![0u8; total_len - 4];
7359 stream.read_exact(&mut rest).await?;
7360
7361 let (body, crc_bytes) = rest.split_at(rest.len() - 4);
7363 let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
7364 let mut check_input = Vec::with_capacity(4 + body.len());
7365 check_input.extend_from_slice(&len_buf);
7366 check_input.extend_from_slice(body);
7367 let actual_crc = crate::transport_intermediate::crc32_ieee(&check_input);
7368 if actual_crc != expected_crc {
7369 return Err(InvocationError::Deserialize(format!(
7370 "Full plaintext: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
7371 )));
7372 }
7373
7374 let recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
7376 let expected_seq = recv_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
7377 if recv_seq != expected_seq {
7378 return Err(InvocationError::Deserialize(format!(
7379 "Full plaintext: seqno mismatch (got {recv_seq}, expected {expected_seq})"
7380 )));
7381 }
7382
7383 body[4..].to_vec()
7384 }
7385 FrameKind::Obfuscated { cipher } => {
7386 let mut h = [0u8; 1];
7388 stream.read_exact(&mut h).await?;
7389 cipher.lock().await.decrypt(&mut h);
7390 let words = if h[0] < 0x7f {
7391 h[0] as usize
7392 } else {
7393 let mut b = [0u8; 3];
7394 stream.read_exact(&mut b).await?;
7395 cipher.lock().await.decrypt(&mut b);
7396 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
7397 };
7398 let mut buf = vec![0u8; words * 4];
7399 stream.read_exact(&mut buf).await?;
7400 cipher.lock().await.decrypt(&mut buf);
7401 buf
7402 }
7403 FrameKind::PaddedIntermediate { cipher } => {
7404 let mut len_buf = [0u8; 4];
7405 stream.read_exact(&mut len_buf).await?;
7406 cipher.lock().await.decrypt(&mut len_buf);
7407 let len = u32::from_le_bytes(len_buf) as usize;
7408 if len == 0 || len > 1 << 24 {
7409 return Err(InvocationError::Deserialize(format!(
7410 "PaddedIntermediate plaintext: implausible length {len}"
7411 )));
7412 }
7413 let mut buf = vec![0u8; len];
7414 stream.read_exact(&mut buf).await?;
7415 cipher.lock().await.decrypt(&mut buf);
7416 buf
7417 }
7418 FrameKind::FakeTls { cipher } => {
7419 let mut hdr = [0u8; 5];
7420 stream.read_exact(&mut hdr).await?;
7421 if hdr[0] != 0x17 {
7422 return Err(InvocationError::Deserialize(format!(
7423 "FakeTLS plaintext: unexpected record type 0x{:02x}",
7424 hdr[0]
7425 )));
7426 }
7427 let payload_len = u16::from_be_bytes([hdr[3], hdr[4]]) as usize;
7428 let mut buf = vec![0u8; payload_len];
7429 stream.read_exact(&mut buf).await?;
7430 cipher.lock().await.decrypt(&mut buf);
7431 buf
7432 }
7433 };
7434 if raw.len() < 20 {
7435 return Err(InvocationError::Deserialize(
7436 "plaintext frame too short".into(),
7437 ));
7438 }
7439 if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
7440 return Err(InvocationError::Deserialize(
7441 "expected auth_key_id=0 in plaintext".into(),
7442 ));
7443 }
7444 let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
7445 if 20 + body_len > raw.len() {
7446 return Err(InvocationError::Deserialize(
7447 "plaintext frame: body_len exceeds frame size".into(),
7448 ));
7449 }
7450 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
7451 T::deserialize(&mut cur).map_err(Into::into)
7452}
7453
7454enum EnvelopeResult {
7457 Payload(Vec<u8>),
7458 RawUpdates(Vec<Vec<u8>>),
7460 Pts(i32, i32),
7462 None,
7463}
7464
7465fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
7466 if body.len() < 4 {
7467 return Err(InvocationError::Deserialize("body < 4 bytes".into()));
7468 }
7469 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
7470
7471 match cid {
7472 ID_RPC_RESULT => {
7473 if body.len() < 12 {
7474 return Err(InvocationError::Deserialize("rpc_result too short".into()));
7475 }
7476 unwrap_envelope(body[12..].to_vec())
7477 }
7478 ID_RPC_ERROR => {
7479 if body.len() < 8 {
7480 return Err(InvocationError::Deserialize("rpc_error too short".into()));
7481 }
7482 let code = i32::from_le_bytes(body[4..8].try_into().unwrap());
7483 let message = tl_read_string(&body[8..]).unwrap_or_default();
7484 Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
7485 }
7486 ID_MSG_CONTAINER => {
7487 if body.len() < 8 {
7488 return Err(InvocationError::Deserialize("container too short".into()));
7489 }
7490 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
7491 let mut pos = 8usize;
7492 let mut payload: Option<Vec<u8>> = None;
7493 let mut raw_updates: Vec<Vec<u8>> = Vec::new();
7494
7495 for _ in 0..count {
7496 if pos + 16 > body.len() { break; }
7497 let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
7498 pos += 16;
7499 if pos + inner_len > body.len() { break; }
7500 let inner = body[pos..pos + inner_len].to_vec();
7501 pos += inner_len;
7502 match unwrap_envelope(inner)? {
7503 EnvelopeResult::Payload(p) => { payload = Some(p); }
7504 EnvelopeResult::RawUpdates(mut raws) => { raw_updates.append(&mut raws); }
7505 EnvelopeResult::Pts(_, _) => {} EnvelopeResult::None => {}
7507 }
7508 }
7509 if let Some(p) = payload {
7510 Ok(EnvelopeResult::Payload(p))
7511 } else if !raw_updates.is_empty() {
7512 Ok(EnvelopeResult::RawUpdates(raw_updates))
7513 } else {
7514 Ok(EnvelopeResult::None)
7515 }
7516 }
7517 ID_GZIP_PACKED => {
7518 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
7519 unwrap_envelope(gz_inflate(&bytes)?)
7520 }
7521 ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY
7526 | 0xd33b5459 | 0x04deb57d | 0x8cc0d131 | 0x276d3ec6 | 0x809db6df | 0x7d861a08 | 0x0949d9dc | 0xae500895 | 0x9299359f | 0xe22045fc | 0x62d350c9 => {
7539 Ok(EnvelopeResult::None)
7540 }
7541 ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
7547 | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
7548 | ID_UPDATES_TOO_LONG => {
7549 Ok(EnvelopeResult::RawUpdates(vec![body]))
7550 }
7551 ID_UPDATE_SHORT_SENT_MSG => {
7558 let mut cur = Cursor::from_slice(&body[4..]);
7559 match tl::types::UpdateShortSentMessage::deserialize(&mut cur) {
7560 Ok(m) => {
7561 tracing::debug!(
7562 "[layer] updateShortSentMessage (RPC): pts={} pts_count={}: advancing pts",
7563 m.pts, m.pts_count
7564 );
7565 Ok(EnvelopeResult::Pts(m.pts, m.pts_count))
7566 }
7567 Err(e) => {
7568 tracing::debug!("[layer] updateShortSentMessage deserialize error: {e}");
7569 Ok(EnvelopeResult::None)
7570 }
7571 }
7572 }
7573 _ => Ok(EnvelopeResult::Payload(body)),
7574 }
7575}
7576
7577fn random_i64() -> i64 {
7580 let mut b = [0u8; 8];
7581 getrandom::getrandom(&mut b).expect("getrandom");
7582 i64::from_le_bytes(b)
7583}
7584
7585fn jitter_delay(base_ms: u64) -> Duration {
7589 let mut b = [0u8; 2];
7591 getrandom::getrandom(&mut b).unwrap_or(());
7592 let rand_frac = u16::from_le_bytes(b) as f64 / 65535.0; let factor = 0.80 + rand_frac * 0.40; Duration::from_millis((base_ms as f64 * factor) as u64)
7595}
7596
7597pub(crate) fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
7598 if data.is_empty() {
7599 return Some(vec![]);
7600 }
7601 let (len, start) = if data[0] < 254 {
7602 (data[0] as usize, 1)
7603 } else if data.len() >= 4 {
7604 (
7605 data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16,
7606 4,
7607 )
7608 } else {
7609 return None;
7610 };
7611 if data.len() < start + len {
7612 return None;
7613 }
7614 Some(data[start..start + len].to_vec())
7615}
7616
7617fn tl_read_string(data: &[u8]) -> Option<String> {
7618 tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
7619}
7620
7621pub(crate) fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
7622 use std::io::Read;
7623 let mut out = Vec::new();
7624 if flate2::read::GzDecoder::new(data)
7625 .read_to_end(&mut out)
7626 .is_ok()
7627 && !out.is_empty()
7628 {
7629 return Ok(out);
7630 }
7631 out.clear();
7632 flate2::read::ZlibDecoder::new(data)
7633 .read_to_end(&mut out)
7634 .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
7635 Ok(out)
7636}
7637
7638pub(crate) fn maybe_gz_decompress(body: Vec<u8>) -> Result<Vec<u8>, InvocationError> {
7639 const ID_GZIP_PACKED_LOCAL: u32 = 0x3072cfa1;
7640 if body.len() >= 4 && u32::from_le_bytes(body[0..4].try_into().unwrap()) == ID_GZIP_PACKED_LOCAL
7641 {
7642 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
7643 gz_inflate(&bytes)
7644 } else {
7645 Ok(body)
7646 }
7647}
7648
7649const COMPRESSION_THRESHOLD: usize = 512;
7654
7655fn tl_write_bytes(data: &[u8]) -> Vec<u8> {
7657 let len = data.len();
7658 let mut out = Vec::with_capacity(4 + len);
7659 if len < 254 {
7660 out.push(len as u8);
7661 out.extend_from_slice(data);
7662 let pad = (4 - (1 + len) % 4) % 4;
7663 out.extend(std::iter::repeat_n(0u8, pad));
7664 } else {
7665 out.push(0xfe);
7666 out.extend_from_slice(&(len as u32).to_le_bytes()[..3]);
7667 out.extend_from_slice(data);
7668 let pad = (4 - (4 + len) % 4) % 4;
7669 out.extend(std::iter::repeat_n(0u8, pad));
7670 }
7671 out
7672}
7673
7674fn gz_pack_body(data: &[u8]) -> Vec<u8> {
7676 use std::io::Write;
7677 let mut enc = flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default());
7678 let _ = enc.write_all(data);
7679 let compressed = enc.finish().unwrap_or_default();
7680 let mut out = Vec::with_capacity(4 + 4 + compressed.len());
7681 out.extend_from_slice(&ID_GZIP_PACKED.to_le_bytes());
7682 out.extend(tl_write_bytes(&compressed));
7683 out
7684}
7685
7686fn maybe_gz_pack(data: &[u8]) -> Vec<u8> {
7689 if data.len() <= COMPRESSION_THRESHOLD {
7690 return data.to_vec();
7691 }
7692 let packed = gz_pack_body(data);
7693 if packed.len() < data.len() {
7694 packed
7695 } else {
7696 data.to_vec()
7697 }
7698}
7699
7700fn build_msgs_ack_body(msg_ids: &[i64]) -> Vec<u8> {
7704 let mut out = Vec::with_capacity(4 + 4 + 4 + msg_ids.len() * 8);
7707 out.extend_from_slice(&ID_MSGS_ACK.to_le_bytes());
7708 out.extend_from_slice(&0x1cb5c415_u32.to_le_bytes()); out.extend_from_slice(&(msg_ids.len() as u32).to_le_bytes());
7710 for &id in msg_ids {
7711 out.extend_from_slice(&id.to_le_bytes());
7712 }
7713 out
7714}
7715
7716fn build_container_body(messages: &[(i64, i32, &[u8])]) -> Vec<u8> {
7724 let total_body: usize = messages.iter().map(|(_, _, b)| 16 + b.len()).sum();
7725 let mut out = Vec::with_capacity(8 + total_body);
7726 out.extend_from_slice(&ID_MSG_CONTAINER.to_le_bytes());
7727 out.extend_from_slice(&(messages.len() as u32).to_le_bytes());
7728 for &(msg_id, seqno, body) in messages {
7729 out.extend_from_slice(&msg_id.to_le_bytes());
7730 out.extend_from_slice(&seqno.to_le_bytes());
7731 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
7732 out.extend_from_slice(body);
7733 }
7734 out
7735}