1#![cfg_attr(docsrs, feature(doc_cfg))]
20#![doc(html_root_url = "https://docs.rs/layer-client/0.5.0")]
21#![deny(unsafe_code)]
41
42pub mod builder;
43mod errors;
44pub mod media;
45pub mod parsers;
46pub mod participants;
47pub mod pts;
48mod restart;
49mod retry;
50mod session;
51mod transport;
52mod two_factor_auth;
53pub mod update;
54
55pub mod dc_pool;
56pub mod inline_iter;
57pub mod keyboard;
58pub mod search;
59pub mod session_backend;
60pub mod socks5;
61pub mod transport_intermediate;
62pub mod transport_obfuscated;
63pub mod types;
64pub mod typing_guard;
65
66#[macro_use]
67pub mod macros;
68pub mod peer_ref;
69pub mod reactions;
70
71#[cfg(test)]
72mod pts_tests;
73
74pub mod dc_migration;
75pub mod proxy;
76
77pub use builder::{BuilderError, ClientBuilder};
78pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
79pub use keyboard::{Button, InlineKeyboard, ReplyKeyboard};
80pub use media::{Document, DownloadIter, Downloadable, Photo, Sticker, UploadedFile};
81pub use participants::{Participant, ProfilePhotoIter};
82pub use peer_ref::PeerRef;
83pub use proxy::{MtProxyConfig, parse_proxy_link};
84pub use restart::{ConnectionRestartPolicy, FixedInterval, NeverRestart};
85use retry::RetryLoop;
86pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
87pub use search::{GlobalSearchBuilder, SearchBuilder};
88pub use session::{DcEntry, DcFlags};
89#[cfg(feature = "libsql-session")]
90#[cfg_attr(docsrs, doc(cfg(feature = "libsql-session")))]
91pub use session_backend::LibSqlBackend;
92#[cfg(feature = "sqlite-session")]
93#[cfg_attr(docsrs, doc(cfg(feature = "sqlite-session")))]
94pub use session_backend::SqliteBackend;
95pub use session_backend::{
96 BinaryFileBackend, InMemoryBackend, SessionBackend, StringSessionBackend, UpdateStateChange,
97};
98pub use socks5::Socks5Config;
99pub use types::ChannelKind;
100pub use types::{Channel, Chat, Group, User};
101pub use typing_guard::TypingGuard;
102pub use update::Update;
103pub use update::{ChatActionUpdate, UserStatusUpdate};
104
105pub use layer_tl_types as tl;
108
109use std::collections::HashMap;
110use std::collections::VecDeque;
111use std::num::NonZeroU32;
112use std::ops::ControlFlow;
113use std::sync::Arc;
114use std::time::Duration;
115
116use layer_mtproto::{EncryptedSession, Session, authentication as auth};
117use layer_tl_types::{Cursor, Deserializable, RemoteCall};
118use session::PersistedSession;
119use socket2::TcpKeepalive;
120use tokio::io::{AsyncReadExt, AsyncWriteExt};
121use tokio::net::TcpStream;
122use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
123use tokio::sync::{Mutex, RwLock, mpsc, oneshot};
124use tokio::time::sleep;
125use tokio_util::sync::CancellationToken;
126
127const ID_RPC_RESULT: u32 = 0xf35c6d01;
130const ID_RPC_ERROR: u32 = 0x2144ca19;
131const ID_MSG_CONTAINER: u32 = 0x73f1f8dc;
132const ID_GZIP_PACKED: u32 = 0x3072cfa1;
133const ID_PONG: u32 = 0x347773c5;
134const ID_MSGS_ACK: u32 = 0x62d6b459;
135const ID_BAD_SERVER_SALT: u32 = 0xedab447b;
136const ID_NEW_SESSION: u32 = 0x9ec20908;
137const ID_BAD_MSG_NOTIFY: u32 = 0xa7eff811;
138const ID_FUTURE_SALTS: u32 = 0xae500895;
140const ID_MSG_DETAILED_INFO: u32 = 0x276d3ec6;
142const ID_MSG_NEW_DETAIL_INFO: u32 = 0x809db6df;
143const ID_MSG_RESEND_REQ: u32 = 0x7d861a08;
145const ID_UPDATES: u32 = 0x74ae4240;
146const ID_UPDATE_SHORT: u32 = 0x78d4dec1;
147const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
148const ID_UPDATE_SHORT_MSG: u32 = 0x313bc7f8;
149const ID_UPDATE_SHORT_CHAT_MSG: u32 = 0x4d6deea5;
150const ID_UPDATE_SHORT_SENT_MSG: u32 = 0x9015e101;
151const ID_UPDATES_TOO_LONG: u32 = 0xe317af7e;
152
153const PING_DELAY_SECS: u64 = 60;
159
160const NO_PING_DISCONNECT: i32 = 75;
164
165const RECONNECT_BASE_MS: u64 = 500;
167
168const RECONNECT_MAX_SECS: u64 = 5;
173
174const TCP_KEEPALIVE_IDLE_SECS: u64 = 10;
176const TCP_KEEPALIVE_INTERVAL_SECS: u64 = 5;
178const TCP_KEEPALIVE_PROBES: u32 = 3;
180
181#[derive(Default)]
189pub struct PeerCache {
190 pub users: HashMap<i64, i64>,
192 pub channels: HashMap<i64, i64>,
194}
195
196impl PeerCache {
197 fn cache_user(&mut self, user: &tl::enums::User) {
198 if let tl::enums::User::User(u) = user
199 && let Some(hash) = u.access_hash
200 {
201 self.users.insert(u.id, hash);
202 }
203 }
204
205 fn cache_chat(&mut self, chat: &tl::enums::Chat) {
206 match chat {
207 tl::enums::Chat::Channel(c) => {
208 if let Some(hash) = c.access_hash {
209 self.channels.insert(c.id, hash);
210 }
211 }
212 tl::enums::Chat::ChannelForbidden(c) => {
213 self.channels.insert(c.id, c.access_hash);
214 }
215 _ => {}
216 }
217 }
218
219 fn cache_users(&mut self, users: &[tl::enums::User]) {
220 for u in users {
221 self.cache_user(u);
222 }
223 }
224
225 fn cache_chats(&mut self, chats: &[tl::enums::Chat]) {
226 for c in chats {
227 self.cache_chat(c);
228 }
229 }
230
231 fn user_input_peer(&self, user_id: i64) -> tl::enums::InputPeer {
232 if user_id == 0 {
233 return tl::enums::InputPeer::PeerSelf;
234 }
235 let hash = self.users.get(&user_id).copied().unwrap_or_else(|| {
236 tracing::warn!("[layer] PeerCache: no access_hash for user {user_id}, using 0: may cause USER_ID_INVALID");
237 0
238 });
239 tl::enums::InputPeer::User(tl::types::InputPeerUser {
240 user_id,
241 access_hash: hash,
242 })
243 }
244
245 fn channel_input_peer(&self, channel_id: i64) -> tl::enums::InputPeer {
246 let hash = self.channels.get(&channel_id).copied().unwrap_or_else(|| {
247 tracing::warn!("[layer] PeerCache: no access_hash for channel {channel_id}, using 0: may cause CHANNEL_INVALID");
248 0
249 });
250 tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
251 channel_id,
252 access_hash: hash,
253 })
254 }
255
256 fn peer_to_input(&self, peer: &tl::enums::Peer) -> tl::enums::InputPeer {
257 match peer {
258 tl::enums::Peer::User(u) => self.user_input_peer(u.user_id),
259 tl::enums::Peer::Chat(c) => {
260 tl::enums::InputPeer::Chat(tl::types::InputPeerChat { chat_id: c.chat_id })
261 }
262 tl::enums::Peer::Channel(c) => self.channel_input_peer(c.channel_id),
263 }
264 }
265}
266
267#[derive(Clone, Default)]
289pub struct InputMessage {
290 pub text: String,
291 pub reply_to: Option<i32>,
292 pub silent: bool,
293 pub background: bool,
294 pub clear_draft: bool,
295 pub no_webpage: bool,
296 pub invert_media: bool,
298 pub schedule_once_online: bool,
300 pub entities: Option<Vec<tl::enums::MessageEntity>>,
301 pub reply_markup: Option<tl::enums::ReplyMarkup>,
302 pub schedule_date: Option<i32>,
303 pub media: Option<tl::enums::InputMedia>,
306}
307
308impl InputMessage {
309 pub fn text(text: impl Into<String>) -> Self {
311 Self {
312 text: text.into(),
313 ..Default::default()
314 }
315 }
316
317 pub fn markdown(text: impl AsRef<str>) -> Self {
330 let (plain, ents) = crate::parsers::parse_markdown(text.as_ref());
331 Self {
332 text: plain,
333 entities: if ents.is_empty() { None } else { Some(ents) },
334 ..Default::default()
335 }
336 }
337
338 pub fn html(text: impl AsRef<str>) -> Self {
349 let (plain, ents) = crate::parsers::parse_html(text.as_ref());
350 Self {
351 text: plain,
352 entities: if ents.is_empty() { None } else { Some(ents) },
353 ..Default::default()
354 }
355 }
356
357 pub fn set_text(mut self, text: impl Into<String>) -> Self {
359 self.text = text.into();
360 self
361 }
362
363 pub fn reply_to(mut self, id: Option<i32>) -> Self {
365 self.reply_to = id;
366 self
367 }
368
369 pub fn silent(mut self, v: bool) -> Self {
371 self.silent = v;
372 self
373 }
374
375 pub fn background(mut self, v: bool) -> Self {
377 self.background = v;
378 self
379 }
380
381 pub fn clear_draft(mut self, v: bool) -> Self {
383 self.clear_draft = v;
384 self
385 }
386
387 pub fn no_webpage(mut self, v: bool) -> Self {
389 self.no_webpage = v;
390 self
391 }
392
393 pub fn invert_media(mut self, v: bool) -> Self {
395 self.invert_media = v;
396 self
397 }
398
399 pub fn schedule_once_online(mut self) -> Self {
404 self.schedule_once_online = true;
405 self.schedule_date = None;
406 self
407 }
408
409 pub fn entities(mut self, e: Vec<tl::enums::MessageEntity>) -> Self {
411 self.entities = Some(e);
412 self
413 }
414
415 pub fn reply_markup(mut self, rm: tl::enums::ReplyMarkup) -> Self {
417 self.reply_markup = Some(rm);
418 self
419 }
420
421 pub fn keyboard(mut self, kb: impl Into<tl::enums::ReplyMarkup>) -> Self {
431 self.reply_markup = Some(kb.into());
432 self
433 }
434
435 pub fn schedule_date(mut self, ts: Option<i32>) -> Self {
437 self.schedule_date = ts;
438 self
439 }
440
441 pub fn copy_media(mut self, media: tl::enums::InputMedia) -> Self {
457 self.media = Some(media);
458 self
459 }
460
461 pub fn clear_media(mut self) -> Self {
463 self.media = None;
464 self
465 }
466
467 fn reply_header(&self) -> Option<tl::enums::InputReplyTo> {
468 self.reply_to.map(|id| {
469 tl::enums::InputReplyTo::Message(tl::types::InputReplyToMessage {
470 reply_to_msg_id: id,
471 top_msg_id: None,
472 reply_to_peer_id: None,
473 quote_text: None,
474 quote_entities: None,
475 quote_offset: None,
476 monoforum_peer_id: None,
477 todo_item_id: None,
478 poll_option: None,
479 })
480 })
481 }
482}
483
484impl From<&str> for InputMessage {
485 fn from(s: &str) -> Self {
486 Self::text(s)
487 }
488}
489
490impl From<String> for InputMessage {
491 fn from(s: String) -> Self {
492 Self::text(s)
493 }
494}
495
496#[derive(Clone, Debug)]
509pub enum TransportKind {
510 Abridged,
514 Intermediate,
518 Full,
522 Obfuscated { secret: Option<[u8; 16]> },
530 PaddedIntermediate { secret: Option<[u8; 16]> },
536 FakeTls { secret: [u8; 16], domain: String },
543}
544
545impl Default for TransportKind {
546 fn default() -> Self {
547 TransportKind::Obfuscated { secret: None }
553 }
554}
555
556pub type ShutdownToken = CancellationToken;
576
577#[derive(Clone)]
579pub struct Config {
580 pub api_id: i32,
581 pub api_hash: String,
582 pub dc_addr: Option<String>,
583 pub retry_policy: Arc<dyn RetryPolicy>,
584 pub socks5: Option<crate::socks5::Socks5Config>,
586 pub mtproxy: Option<crate::proxy::MtProxyConfig>,
590 pub allow_ipv6: bool,
592 pub transport: TransportKind,
594 pub session_backend: Arc<dyn crate::session_backend::SessionBackend>,
596 pub catch_up: bool,
600 pub restart_policy: Arc<dyn ConnectionRestartPolicy>,
601 pub device_model: String,
603 pub system_version: String,
605 pub app_version: String,
607 pub system_lang_code: String,
609 pub lang_pack: String,
611 pub lang_code: String,
613}
614
615impl Config {
616 pub fn with_string_session(s: impl Into<String>) -> Self {
631 Config {
632 session_backend: Arc::new(crate::session_backend::StringSessionBackend::new(s)),
633 ..Config::default()
634 }
635 }
636
637 pub fn proxy_link(mut self, url: &str) -> Self {
655 if url.is_empty() {
656 return self;
657 }
658 let cfg = crate::proxy::parse_proxy_link(url)
659 .unwrap_or_else(|| panic!("invalid MTProxy link: {url:?}"));
660 self.mtproxy = Some(cfg);
661 self
662 }
663
664 pub fn proxy(self, host: impl Into<String>, port: u16, secret: &str) -> Self {
681 let host = host.into();
682 let url = format!("tg://proxy?server={host}&port={port}&secret={secret}");
683 self.proxy_link(&url)
684 }
685
686 pub fn socks5(mut self, addr: impl Into<String>) -> Self {
699 self.socks5 = Some(crate::socks5::Socks5Config::new(addr));
700 self
701 }
702
703 pub fn socks5_auth(
716 mut self,
717 addr: impl Into<String>,
718 username: impl Into<String>,
719 password: impl Into<String>,
720 ) -> Self {
721 self.socks5 = Some(crate::socks5::Socks5Config::with_auth(
722 addr, username, password,
723 ));
724 self
725 }
726}
727
728impl Default for Config {
729 fn default() -> Self {
730 Self {
731 api_id: 0,
732 api_hash: String::new(),
733 dc_addr: None,
734 retry_policy: Arc::new(AutoSleep::default()),
735 socks5: None,
736 mtproxy: None,
737 allow_ipv6: false,
738 transport: TransportKind::Obfuscated { secret: None },
739 session_backend: Arc::new(crate::session_backend::BinaryFileBackend::new(
740 "layer.session",
741 )),
742 catch_up: false,
743 restart_policy: Arc::new(NeverRestart),
744 device_model: "Linux".to_string(),
745 system_version: "1.0".to_string(),
746 app_version: env!("CARGO_PKG_VERSION").to_string(),
747 system_lang_code: "en".to_string(),
748 lang_pack: String::new(),
749 lang_code: "en".to_string(),
750 }
751 }
752}
753
754pub struct UpdateStream {
759 rx: mpsc::UnboundedReceiver<update::Update>,
760}
761
762impl UpdateStream {
763 pub async fn next(&mut self) -> Option<update::Update> {
765 self.rx.recv().await
766 }
767
768 pub async fn next_raw(&mut self) -> Option<update::RawUpdate> {
774 loop {
775 match self.rx.recv().await? {
776 update::Update::Raw(r) => return Some(r),
777 _ => continue,
778 }
779 }
780 }
781}
782
783#[derive(Debug, Clone)]
787pub struct Dialog {
788 pub raw: tl::enums::Dialog,
789 pub message: Option<tl::enums::Message>,
790 pub entity: Option<tl::enums::User>,
791 pub chat: Option<tl::enums::Chat>,
792}
793
794impl Dialog {
795 pub fn title(&self) -> String {
797 if let Some(tl::enums::User::User(u)) = &self.entity {
798 let first = u.first_name.as_deref().unwrap_or("");
799 let last = u.last_name.as_deref().unwrap_or("");
800 let name = format!("{first} {last}").trim().to_string();
801 if !name.is_empty() {
802 return name;
803 }
804 }
805 if let Some(chat) = &self.chat {
806 return match chat {
807 tl::enums::Chat::Chat(c) => c.title.clone(),
808 tl::enums::Chat::Forbidden(c) => c.title.clone(),
809 tl::enums::Chat::Channel(c) => c.title.clone(),
810 tl::enums::Chat::ChannelForbidden(c) => c.title.clone(),
811 tl::enums::Chat::Empty(_) => "(empty)".into(),
812 };
813 }
814 "(Unknown)".to_string()
815 }
816
817 pub fn peer(&self) -> Option<&tl::enums::Peer> {
819 match &self.raw {
820 tl::enums::Dialog::Dialog(d) => Some(&d.peer),
821 tl::enums::Dialog::Folder(_) => None,
822 }
823 }
824
825 pub fn unread_count(&self) -> i32 {
827 match &self.raw {
828 tl::enums::Dialog::Dialog(d) => d.unread_count,
829 _ => 0,
830 }
831 }
832
833 pub fn top_message(&self) -> i32 {
835 match &self.raw {
836 tl::enums::Dialog::Dialog(d) => d.top_message,
837 _ => 0,
838 }
839 }
840}
841
842struct ClientInner {
845 writer: Mutex<ConnectionWriter>,
848 write_half: Mutex<OwnedWriteHalf>,
852 #[allow(clippy::type_complexity)]
856 pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>>,
857 reconnect_tx: mpsc::UnboundedSender<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
860 network_hint_tx: mpsc::UnboundedSender<()>,
863 #[allow(dead_code)]
865 shutdown_token: CancellationToken,
866 #[allow(dead_code)]
868 catch_up: bool,
869 restart_policy: Arc<dyn ConnectionRestartPolicy>,
870 home_dc_id: Mutex<i32>,
871 dc_options: Mutex<HashMap<i32, DcEntry>>,
872 media_dc_options: Mutex<HashMap<i32, DcEntry>>,
874 pub peer_cache: RwLock<PeerCache>,
875 pub pts_state: Mutex<pts::PtsState>,
876 pub possible_gap: Mutex<pts::PossibleGapBuffer>,
878 api_id: i32,
879 api_hash: String,
880 device_model: String,
881 system_version: String,
882 app_version: String,
883 system_lang_code: String,
884 lang_pack: String,
885 lang_code: String,
886 retry_policy: Arc<dyn RetryPolicy>,
887 socks5: Option<crate::socks5::Socks5Config>,
888 mtproxy: Option<crate::proxy::MtProxyConfig>,
889 allow_ipv6: bool,
890 transport: TransportKind,
891 session_backend: Arc<dyn crate::session_backend::SessionBackend>,
892 dc_pool: Mutex<dc_pool::DcPool>,
893 update_tx: mpsc::Sender<update::Update>,
894 pub is_bot: std::sync::atomic::AtomicBool,
898 stream_active: std::sync::atomic::AtomicBool,
900 salt_request_in_flight: std::sync::atomic::AtomicBool,
904 dh_in_progress: std::sync::atomic::AtomicBool,
908}
909
910#[derive(Clone)]
912pub struct Client {
913 pub(crate) inner: Arc<ClientInner>,
914 _update_rx: Arc<Mutex<mpsc::Receiver<update::Update>>>,
915}
916
917impl Client {
918 pub fn builder() -> crate::builder::ClientBuilder {
933 crate::builder::ClientBuilder::default()
934 }
935
936 pub async fn connect(config: Config) -> Result<(Self, ShutdownToken), InvocationError> {
939 if config.api_id == 0 {
941 return Err(InvocationError::Deserialize(
942 "api_id must be non-zero".into(),
943 ));
944 }
945 if config.api_hash.is_empty() {
946 return Err(InvocationError::Deserialize(
947 "api_hash must not be empty".into(),
948 ));
949 }
950
951 let (update_tx, update_rx) = mpsc::channel(2048);
954
955 let socks5 = config.socks5.clone();
957 let mtproxy = config.mtproxy.clone();
958 let transport = config.transport.clone();
959
960 let (conn, home_dc_id, dc_opts, media_dc_opts, loaded_session) =
961 match config.session_backend.load().map_err(InvocationError::Io)? {
962 Some(s) => {
963 if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
964 if let Some(key) = dc.auth_key {
965 tracing::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
966 match Connection::connect_with_key(
967 &dc.addr,
968 key,
969 dc.first_salt,
970 dc.time_offset,
971 socks5.as_ref(),
972 &transport,
973 s.home_dc_id as i16,
974 )
975 .await
976 {
977 Ok(c) => {
978 let mut opts = session::default_dc_addresses()
979 .into_iter()
980 .map(|(id, addr)| {
981 (
982 id,
983 DcEntry {
984 dc_id: id,
985 addr,
986 auth_key: None,
987 first_salt: 0,
988 time_offset: 0,
989 flags: DcFlags::NONE,
990 },
991 )
992 })
993 .collect::<HashMap<_, _>>();
994 let mut media_opts: HashMap<i32, DcEntry> = HashMap::new();
995 for d in &s.dcs {
996 if d.flags.contains(DcFlags::MEDIA_ONLY)
997 || d.flags.contains(DcFlags::CDN)
998 {
999 media_opts.insert(d.dc_id, d.clone());
1000 } else {
1001 opts.insert(d.dc_id, d.clone());
1002 }
1003 }
1004 (c, s.home_dc_id, opts, media_opts, Some(s))
1005 }
1006 Err(e) => {
1007 tracing::warn!(
1013 "[layer] Session connect failed ({e}): \
1014 returning error (delete session file to reset)"
1015 );
1016 return Err(e);
1017 }
1018 }
1019 } else {
1020 let (c, dc, opts) =
1021 Self::fresh_connect(socks5.as_ref(), mtproxy.as_ref(), &transport)
1022 .await?;
1023 (c, dc, opts, HashMap::new(), None)
1024 }
1025 } else {
1026 let (c, dc, opts) =
1027 Self::fresh_connect(socks5.as_ref(), mtproxy.as_ref(), &transport)
1028 .await?;
1029 (c, dc, opts, HashMap::new(), None)
1030 }
1031 }
1032 None => {
1033 let (c, dc, opts) =
1034 Self::fresh_connect(socks5.as_ref(), mtproxy.as_ref(), &transport).await?;
1035 (c, dc, opts, HashMap::new(), None)
1036 }
1037 };
1038
1039 let pool = dc_pool::DcPool::new(home_dc_id, &dc_opts.values().cloned().collect::<Vec<_>>());
1041
1042 let (writer, write_half, read_half, frame_kind) = conn.into_writer();
1047 let auth_key = writer.enc.auth_key_bytes();
1048 let session_id = writer.enc.session_id();
1049
1050 #[allow(clippy::type_complexity)]
1051 let pending: Arc<
1052 Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>,
1053 > = Arc::new(Mutex::new(HashMap::new()));
1054
1055 let (reconnect_tx, reconnect_rx) =
1057 mpsc::unbounded_channel::<(OwnedReadHalf, FrameKind, [u8; 256], i64)>();
1058
1059 let (network_hint_tx, network_hint_rx) = mpsc::unbounded_channel::<()>();
1062
1063 let shutdown_token = CancellationToken::new();
1065 let catch_up = config.catch_up;
1066 let restart_policy = config.restart_policy;
1067
1068 let inner = Arc::new(ClientInner {
1069 writer: Mutex::new(writer),
1070 write_half: Mutex::new(write_half),
1071 pending: pending.clone(),
1072 reconnect_tx,
1073 network_hint_tx,
1074 shutdown_token: shutdown_token.clone(),
1075 catch_up,
1076 restart_policy,
1077 home_dc_id: Mutex::new(home_dc_id),
1078 dc_options: Mutex::new(dc_opts),
1079 media_dc_options: Mutex::new(media_dc_opts),
1080 peer_cache: RwLock::new(PeerCache::default()),
1081 pts_state: Mutex::new(pts::PtsState::default()),
1082 possible_gap: Mutex::new(pts::PossibleGapBuffer::new()),
1083 api_id: config.api_id,
1084 api_hash: config.api_hash,
1085 device_model: config.device_model,
1086 system_version: config.system_version,
1087 app_version: config.app_version,
1088 system_lang_code: config.system_lang_code,
1089 lang_pack: config.lang_pack,
1090 lang_code: config.lang_code,
1091 retry_policy: config.retry_policy,
1092 socks5: config.socks5,
1093 mtproxy: config.mtproxy,
1094 allow_ipv6: config.allow_ipv6,
1095 transport: config.transport,
1096 session_backend: config.session_backend,
1097 dc_pool: Mutex::new(pool),
1098 update_tx,
1099 is_bot: std::sync::atomic::AtomicBool::new(false),
1100 stream_active: std::sync::atomic::AtomicBool::new(false),
1101 salt_request_in_flight: std::sync::atomic::AtomicBool::new(false),
1102 dh_in_progress: std::sync::atomic::AtomicBool::new(false),
1103 });
1104
1105 let client = Self {
1106 inner,
1107 _update_rx: Arc::new(Mutex::new(update_rx)),
1108 };
1109
1110 {
1113 let client_r = client.clone();
1114 let shutdown_r = shutdown_token.clone();
1115 tokio::spawn(async move {
1116 client_r
1117 .run_reader_task(
1118 read_half,
1119 frame_kind,
1120 auth_key,
1121 session_id,
1122 reconnect_rx,
1123 network_hint_rx,
1124 shutdown_r,
1125 )
1126 .await;
1127 });
1128 }
1129
1130 if let Err(e) = client.init_connection().await {
1137 let key_is_stale = match &e {
1138 InvocationError::Rpc(r) if r.code == -404 => true,
1139 _ => false,
1142 };
1143
1144 let dh_allowed = key_is_stale
1148 && client
1149 .inner
1150 .dh_in_progress
1151 .compare_exchange(
1152 false,
1153 true,
1154 std::sync::atomic::Ordering::SeqCst,
1155 std::sync::atomic::Ordering::SeqCst,
1156 )
1157 .is_ok();
1158
1159 if dh_allowed {
1160 tracing::warn!("[layer] init_connection: definitive bad-key ({e}), fresh DH …");
1161 {
1162 let home_dc_id = *client.inner.home_dc_id.lock().await;
1163 let mut opts = client.inner.dc_options.lock().await;
1164 if let Some(entry) = opts.get_mut(&home_dc_id)
1165 && entry.auth_key.is_some()
1166 {
1167 tracing::warn!("[layer] Clearing stale auth key for DC{home_dc_id}");
1168 entry.auth_key = None;
1169 entry.first_salt = 0;
1170 entry.time_offset = 0;
1171 }
1172 }
1173 client.save_session().await.ok();
1174 client.inner.pending.lock().await.clear();
1175
1176 let socks5_r = client.inner.socks5.clone();
1177 let mtproxy_r = client.inner.mtproxy.clone();
1178 let transport_r = client.inner.transport.clone();
1179
1180 let home_dc_id_r = *client.inner.home_dc_id.lock().await;
1184 let addr_r = {
1185 let opts = client.inner.dc_options.lock().await;
1186 opts.get(&home_dc_id_r)
1187 .map(|e| e.addr.clone())
1188 .unwrap_or_else(|| {
1189 crate::dc_migration::fallback_dc_addr(home_dc_id_r).to_string()
1190 })
1191 };
1192 let new_conn = Connection::connect_raw(
1193 &addr_r,
1194 socks5_r.as_ref(),
1195 mtproxy_r.as_ref(),
1196 &transport_r,
1197 home_dc_id_r as i16,
1198 )
1199 .await?;
1200
1201 let (new_writer, new_wh, new_read, new_fk) = new_conn.into_writer();
1203 {
1205 let mut opts_guard = client.inner.dc_options.lock().await;
1206 if let Some(entry) = opts_guard.get_mut(&home_dc_id_r) {
1207 entry.auth_key = Some(new_writer.auth_key_bytes());
1208 entry.first_salt = new_writer.first_salt();
1209 entry.time_offset = new_writer.time_offset();
1210 }
1211 }
1212 let new_ak = new_writer.enc.auth_key_bytes();
1214 let new_sid = new_writer.enc.session_id();
1215 *client.inner.writer.lock().await = new_writer;
1216 *client.inner.write_half.lock().await = new_wh;
1217 let _ = client
1218 .inner
1219 .reconnect_tx
1220 .send((new_read, new_fk, new_ak, new_sid));
1221 tokio::task::yield_now().await;
1222
1223 {
1229 let mut attempt = 0u32;
1230 const MAX_ATTEMPTS: u32 = 5;
1231 loop {
1232 match client.init_connection().await {
1233 Ok(()) => break,
1234 Err(InvocationError::Rpc(ref r))
1235 if r.code == 401 && attempt < MAX_ATTEMPTS =>
1236 {
1237 let delay =
1238 std::time::Duration::from_millis(500 * (1u64 << attempt));
1239 tracing::warn!(
1240 "[layer] init_connection AUTH_KEY_UNREGISTERED \
1241 (attempt {}/{MAX_ATTEMPTS}): key not yet propagated, \
1242 retrying in {delay:?}",
1243 attempt + 1,
1244 );
1245 tokio::time::sleep(delay).await;
1246 attempt += 1;
1247 }
1248 Err(e) => return Err(e),
1249 }
1250 }
1251 }
1252 client
1253 .inner
1254 .dh_in_progress
1255 .store(false, std::sync::atomic::Ordering::SeqCst);
1256 client.save_session().await.ok();
1258
1259 tracing::warn!(
1260 "[layer] Session invalidated and reset. \
1261 Call is_authorized() and re-authenticate if needed."
1262 );
1263 } else {
1264 return Err(e);
1265 }
1266 }
1267
1268 if let Some(ref s) = loaded_session
1280 && !s.peers.is_empty()
1281 {
1282 let mut cache = client.inner.peer_cache.write().await;
1283 for p in &s.peers {
1284 if p.is_channel {
1285 cache.channels.entry(p.id).or_insert(p.access_hash);
1286 } else {
1287 cache.users.entry(p.id).or_insert(p.access_hash);
1288 }
1289 }
1290 tracing::debug!(
1291 "[layer] Peer cache restored: {} users, {} channels",
1292 cache.users.len(),
1293 cache.channels.len()
1294 );
1295 }
1296
1297 let has_saved_state = loaded_session
1307 .as_ref()
1308 .is_some_and(|s| s.updates_state.is_initialised());
1309
1310 if catch_up && has_saved_state {
1311 let snap = &loaded_session.as_ref().unwrap().updates_state;
1312 let mut state = client.inner.pts_state.lock().await;
1313 state.pts = snap.pts;
1314 state.qts = snap.qts;
1315 state.date = snap.date;
1316 state.seq = snap.seq;
1317 for &(cid, cpts) in &snap.channels {
1318 state.channel_pts.insert(cid, cpts);
1319 }
1320 tracing::info!(
1321 "[layer] Update state restored: pts={}, qts={}, seq={}, {} channels",
1322 state.pts,
1323 state.qts,
1324 state.seq,
1325 state.channel_pts.len()
1326 );
1327 drop(state);
1328
1329 let channel_ids: Vec<i64> = snap.channels.iter().map(|&(cid, _)| cid).collect();
1333
1334 let c = client.clone();
1337 let utx = client.inner.update_tx.clone();
1338 tokio::spawn(async move {
1339 match c.get_difference().await {
1341 Ok(missed) => {
1342 tracing::info!(
1343 "[layer] catch_up: {} global updates replayed",
1344 missed.len()
1345 );
1346 for u in missed {
1347 if utx.try_send(attach_client_to_update(u, &c)).is_err() {
1348 tracing::warn!(
1349 "[layer] update channel full: dropping catch-up update"
1350 );
1351 break;
1352 }
1353 }
1354 }
1355 Err(e) => tracing::warn!("[layer] catch_up getDifference: {e}"),
1356 }
1357
1358 if !channel_ids.is_empty() {
1363 tracing::info!(
1364 "[layer] catch_up: per-channel diff for {} channels",
1365 channel_ids.len()
1366 );
1367 let sem = std::sync::Arc::new(tokio::sync::Semaphore::new(10));
1368 for channel_id in channel_ids {
1369 let c2 = c.clone();
1370 let utx2 = utx.clone();
1371 let permit = sem.clone().acquire_owned().await.unwrap();
1372 tokio::spawn(async move {
1373 let _permit = permit; match c2.get_channel_difference(channel_id).await {
1375 Ok(updates) => {
1376 if !updates.is_empty() {
1377 tracing::debug!(
1378 "[layer] catch_up channel {channel_id}: {} updates",
1379 updates.len()
1380 );
1381 }
1382 for u in updates {
1383 if utx2.try_send(u).is_err() {
1384 tracing::warn!(
1385 "[layer] update channel full: dropping channel diff update"
1386 );
1387 break;
1388 }
1389 }
1390 }
1391 Err(e) => {
1392 tracing::warn!("[layer] catch_up channel {channel_id}: {e}")
1393 }
1394 }
1395 });
1396 }
1397 }
1398 });
1399 } else {
1400 let _ = client.sync_pts_state().await;
1402 }
1403
1404 Ok((client, shutdown_token))
1405 }
1406
1407 async fn fresh_connect(
1408 socks5: Option<&crate::socks5::Socks5Config>,
1409 mtproxy: Option<&crate::proxy::MtProxyConfig>,
1410 transport: &TransportKind,
1411 ) -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
1412 tracing::debug!("[layer] Fresh connect to DC2 …");
1413 let conn = Connection::connect_raw(
1414 crate::dc_migration::fallback_dc_addr(2),
1415 socks5,
1416 mtproxy,
1417 transport,
1418 2i16,
1419 )
1420 .await?;
1421 let opts = session::default_dc_addresses()
1422 .into_iter()
1423 .map(|(id, addr)| {
1424 (
1425 id,
1426 DcEntry {
1427 dc_id: id,
1428 addr,
1429 auth_key: None,
1430 first_salt: 0,
1431 time_offset: 0,
1432 flags: DcFlags::NONE,
1433 },
1434 )
1435 })
1436 .collect();
1437 Ok((conn, 2, opts))
1438 }
1439
1440 async fn build_persisted_session(&self) -> PersistedSession {
1448 use session::{CachedPeer, UpdatesStateSnap};
1449
1450 let writer_guard = self.inner.writer.lock().await;
1451 let home_dc_id = *self.inner.home_dc_id.lock().await;
1452 let dc_options = self.inner.dc_options.lock().await;
1453
1454 let mut dcs: Vec<DcEntry> = dc_options
1455 .values()
1456 .map(|e| DcEntry {
1457 dc_id: e.dc_id,
1458 addr: e.addr.clone(),
1459 auth_key: if e.dc_id == home_dc_id {
1460 Some(writer_guard.auth_key_bytes())
1461 } else {
1462 e.auth_key
1463 },
1464 first_salt: if e.dc_id == home_dc_id {
1465 writer_guard.first_salt()
1466 } else {
1467 e.first_salt
1468 },
1469 time_offset: if e.dc_id == home_dc_id {
1470 writer_guard.time_offset()
1471 } else {
1472 e.time_offset
1473 },
1474 flags: e.flags,
1475 })
1476 .collect();
1477 {
1479 let media_opts = self.inner.media_dc_options.lock().await;
1480 for e in media_opts.values() {
1481 dcs.push(e.clone());
1482 }
1483 }
1484 self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
1485
1486 let pts_snap = {
1487 let s = self.inner.pts_state.lock().await;
1488 UpdatesStateSnap {
1489 pts: s.pts,
1490 qts: s.qts,
1491 date: s.date,
1492 seq: s.seq,
1493 channels: s.channel_pts.iter().map(|(&k, &v)| (k, v)).collect(),
1494 }
1495 };
1496
1497 let peers: Vec<CachedPeer> = {
1498 let cache = self.inner.peer_cache.read().await;
1499 let mut v = Vec::with_capacity(cache.users.len() + cache.channels.len());
1500 for (&id, &hash) in &cache.users {
1501 v.push(CachedPeer {
1502 id,
1503 access_hash: hash,
1504 is_channel: false,
1505 });
1506 }
1507 for (&id, &hash) in &cache.channels {
1508 v.push(CachedPeer {
1509 id,
1510 access_hash: hash,
1511 is_channel: true,
1512 });
1513 }
1514 v
1515 };
1516
1517 PersistedSession {
1518 home_dc_id,
1519 dcs,
1520 updates_state: pts_snap,
1521 peers,
1522 }
1523 }
1524
1525 pub async fn save_session(&self) -> Result<(), InvocationError> {
1527 let session = self.build_persisted_session().await;
1528 self.inner
1529 .session_backend
1530 .save(&session)
1531 .map_err(InvocationError::Io)?;
1532 tracing::debug!("[layer] Session saved ✓");
1533 Ok(())
1534 }
1535
1536 pub async fn export_session_string(&self) -> Result<String, InvocationError> {
1543 Ok(self.build_persisted_session().await.to_string())
1544 }
1545
1546 pub async fn media_dc_addr(&self, dc_id: i32) -> Option<String> {
1552 self.inner
1553 .media_dc_options
1554 .lock()
1555 .await
1556 .get(&dc_id)
1557 .map(|e| e.addr.clone())
1558 }
1559
1560 pub async fn best_media_dc_addr(&self) -> Option<(i32, String)> {
1563 let home = *self.inner.home_dc_id.lock().await;
1564 let media = self.inner.media_dc_options.lock().await;
1565 media
1566 .get(&home)
1567 .map(|e| (home, e.addr.clone()))
1568 .or_else(|| media.iter().next().map(|(&id, e)| (id, e.addr.clone())))
1569 }
1570
1571 pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
1573 match self.invoke(&tl::functions::updates::GetState {}).await {
1574 Ok(_) => Ok(true),
1575 Err(e)
1576 if e.is("AUTH_KEY_UNREGISTERED")
1577 || matches!(&e, InvocationError::Rpc(r) if r.code == 401) =>
1578 {
1579 Ok(false)
1580 }
1581 Err(e) => Err(e),
1582 }
1583 }
1584
1585 pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
1587 let req = tl::functions::auth::ImportBotAuthorization {
1588 flags: 0,
1589 api_id: self.inner.api_id,
1590 api_hash: self.inner.api_hash.clone(),
1591 bot_auth_token: token.to_string(),
1592 };
1593
1594 let result = self.invoke(&req).await?;
1595
1596 let name = match result {
1597 tl::enums::auth::Authorization::Authorization(a) => {
1598 self.cache_user(&a.user).await;
1599 Self::extract_user_name(&a.user)
1600 }
1601 tl::enums::auth::Authorization::SignUpRequired(_) => {
1602 return Err(InvocationError::Deserialize(
1603 "unexpected SignUpRequired during bot sign-in".into(),
1604 ));
1605 }
1606 };
1607 tracing::info!("[layer] Bot signed in ✓ ({name})");
1608 self.inner
1609 .is_bot
1610 .store(true, std::sync::atomic::Ordering::Relaxed);
1611 Ok(name)
1612 }
1613
1614 pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
1616 use tl::enums::auth::SentCode;
1617
1618 let req = self.make_send_code_req(phone);
1619 let body = self.rpc_call_raw(&req).await?;
1620
1621 let mut cur = Cursor::from_slice(&body);
1622 let hash = match tl::enums::auth::SentCode::deserialize(&mut cur)? {
1623 SentCode::SentCode(s) => s.phone_code_hash,
1624 SentCode::Success(_) => {
1625 return Err(InvocationError::Deserialize("unexpected Success".into()));
1626 }
1627 SentCode::PaymentRequired(_) => {
1628 return Err(InvocationError::Deserialize(
1629 "payment required to send code".into(),
1630 ));
1631 }
1632 };
1633 tracing::info!("[layer] Login code sent");
1634 Ok(LoginToken {
1635 phone: phone.to_string(),
1636 phone_code_hash: hash,
1637 })
1638 }
1639
1640 pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
1642 let req = tl::functions::auth::SignIn {
1643 phone_number: token.phone.clone(),
1644 phone_code_hash: token.phone_code_hash.clone(),
1645 phone_code: Some(code.trim().to_string()),
1646 email_verification: None,
1647 };
1648
1649 let body = match self.rpc_call_raw(&req).await {
1650 Ok(b) => b,
1651 Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
1652 let t = self.get_password_info().await.map_err(SignInError::Other)?;
1653 return Err(SignInError::PasswordRequired(Box::new(t)));
1654 }
1655 Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
1656 Err(e) => return Err(SignInError::Other(e)),
1657 };
1658
1659 let mut cur = Cursor::from_slice(&body);
1660 match tl::enums::auth::Authorization::deserialize(&mut cur)
1661 .map_err(|e| SignInError::Other(e.into()))?
1662 {
1663 tl::enums::auth::Authorization::Authorization(a) => {
1664 self.cache_user(&a.user).await;
1665 let name = Self::extract_user_name(&a.user);
1666 tracing::info!("[layer] Signed in ✓ Welcome, {name}!");
1667 Ok(name)
1668 }
1669 tl::enums::auth::Authorization::SignUpRequired(_) => Err(SignInError::SignUpRequired),
1670 }
1671 }
1672
1673 pub async fn check_password(
1675 &self,
1676 token: PasswordToken,
1677 password: impl AsRef<[u8]>,
1678 ) -> Result<String, InvocationError> {
1679 let pw = token.password;
1680 let algo = pw
1681 .current_algo
1682 .ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
1683 let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
1684 let g_b = pw
1685 .srp_b
1686 .ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
1687 let a = pw.secure_random;
1688 let srp_id = pw
1689 .srp_id
1690 .ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
1691
1692 let (m1, g_a) =
1693 two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
1694 let req = tl::functions::auth::CheckPassword {
1695 password: tl::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
1696 tl::types::InputCheckPasswordSrp {
1697 srp_id,
1698 a: g_a.to_vec(),
1699 m1: m1.to_vec(),
1700 },
1701 ),
1702 };
1703
1704 let body = self.rpc_call_raw(&req).await?;
1705 let mut cur = Cursor::from_slice(&body);
1706 match tl::enums::auth::Authorization::deserialize(&mut cur)? {
1707 tl::enums::auth::Authorization::Authorization(a) => {
1708 self.cache_user(&a.user).await;
1709 let name = Self::extract_user_name(&a.user);
1710 tracing::info!("[layer] 2FA ✓ Welcome, {name}!");
1711 Ok(name)
1712 }
1713 tl::enums::auth::Authorization::SignUpRequired(_) => Err(InvocationError::Deserialize(
1714 "unexpected SignUpRequired after 2FA".into(),
1715 )),
1716 }
1717 }
1718
1719 pub async fn sign_out(&self) -> Result<bool, InvocationError> {
1721 let req = tl::functions::auth::LogOut {};
1722 match self.rpc_call_raw(&req).await {
1723 Ok(_) => {
1724 tracing::info!("[layer] Signed out ✓");
1725 Ok(true)
1726 }
1727 Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
1728 Err(e) => Err(e),
1729 }
1730 }
1731
1732 pub async fn get_users_by_id(
1740 &self,
1741 ids: &[i64],
1742 ) -> Result<Vec<Option<crate::types::User>>, InvocationError> {
1743 let cache = self.inner.peer_cache.read().await;
1744 let input_ids: Vec<tl::enums::InputUser> = ids
1745 .iter()
1746 .map(|&id| {
1747 if id == 0 {
1748 tl::enums::InputUser::UserSelf
1749 } else {
1750 let hash = cache.users.get(&id).copied().unwrap_or(0);
1751 tl::enums::InputUser::InputUser(tl::types::InputUser {
1752 user_id: id,
1753 access_hash: hash,
1754 })
1755 }
1756 })
1757 .collect();
1758 drop(cache);
1759 let req = tl::functions::users::GetUsers { id: input_ids };
1760 let body = self.rpc_call_raw(&req).await?;
1761 let mut cur = Cursor::from_slice(&body);
1762 let users = Vec::<tl::enums::User>::deserialize(&mut cur)?;
1763 self.cache_users_slice(&users).await;
1764 Ok(users
1765 .into_iter()
1766 .map(crate::types::User::from_raw)
1767 .collect())
1768 }
1769
1770 pub async fn get_me(&self) -> Result<tl::types::User, InvocationError> {
1772 let req = tl::functions::users::GetUsers {
1773 id: vec![tl::enums::InputUser::UserSelf],
1774 };
1775 let body = self.rpc_call_raw(&req).await?;
1776 let mut cur = Cursor::from_slice(&body);
1777 let users = Vec::<tl::enums::User>::deserialize(&mut cur)?;
1778 self.cache_users_slice(&users).await;
1779 users
1780 .into_iter()
1781 .find_map(|u| match u {
1782 tl::enums::User::User(u) => Some(u),
1783 _ => None,
1784 })
1785 .ok_or_else(|| InvocationError::Deserialize("getUsers returned no user".into()))
1786 }
1787
1788 pub fn stream_updates(&self) -> UpdateStream {
1796 if self
1800 .inner
1801 .stream_active
1802 .swap(true, std::sync::atomic::Ordering::SeqCst)
1803 {
1804 panic!(
1805 "stream_updates() called twice on the same Client: only one UpdateStream is supported per client"
1806 );
1807 }
1808 let (caller_tx, rx) = mpsc::unbounded_channel::<update::Update>();
1809 let internal_rx = self._update_rx.clone();
1810 tokio::spawn(async move {
1811 let mut guard = internal_rx.lock().await;
1812 while let Some(upd) = guard.recv().await {
1813 if caller_tx.send(upd).is_err() {
1814 break;
1815 }
1816 }
1817 });
1818 UpdateStream { rx }
1819 }
1820
1821 pub fn signal_network_restored(&self) {
1834 let _ = self.inner.network_hint_tx.send(());
1835 }
1836
1837 #[allow(clippy::too_many_arguments)]
1871 async fn run_reader_task(
1872 &self,
1873 read_half: OwnedReadHalf,
1874 frame_kind: FrameKind,
1875 auth_key: [u8; 256],
1876 session_id: i64,
1877 mut new_conn_rx: mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1878 mut network_hint_rx: mpsc::UnboundedReceiver<()>,
1879 shutdown_token: CancellationToken,
1880 ) {
1881 let mut rh = read_half;
1882 let mut fk = frame_kind;
1883 let mut ak = auth_key;
1884 let mut sid = session_id;
1885 let mut restart_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = None;
1888 let mut restart_count: u32 = 0;
1889
1890 loop {
1891 tokio::select! {
1892 _ = shutdown_token.cancelled() => {
1894 tracing::info!("[layer] Reader task: shutdown requested, exiting cleanly.");
1895 let mut pending = self.inner.pending.lock().await;
1896 for (_, tx) in pending.drain() {
1897 let _ = tx.send(Err(InvocationError::Dropped));
1898 }
1899 return;
1900 }
1901
1902 _ = self.reader_loop(
1904 rh, fk, ak, sid,
1905 restart_init_rx.take(),
1906 &mut new_conn_rx, &mut network_hint_rx,
1907 ) => {}
1908 }
1909
1910 if shutdown_token.is_cancelled() {
1913 tracing::debug!("[layer] Reader task: exiting after loop (shutdown).");
1914 return;
1915 }
1916
1917 restart_count += 1;
1918 tracing::error!(
1919 "[layer] Reader loop exited unexpectedly (restart #{restart_count}): supervisor reconnecting …"
1920 );
1921
1922 {
1924 let mut pending = self.inner.pending.lock().await;
1925 for (_, tx) in pending.drain() {
1926 let _ = tx.send(Err(InvocationError::Io(std::io::Error::new(
1927 std::io::ErrorKind::ConnectionReset,
1928 "reader task restarted",
1929 ))));
1930 }
1931 }
1932 self.inner.writer.lock().await.sent_bodies.clear();
1934
1935 let mut delay_ms = RECONNECT_BASE_MS;
1937 let new_conn = loop {
1938 tracing::debug!("[layer] Supervisor: reconnecting in {delay_ms} ms …");
1939 tokio::select! {
1940 _ = shutdown_token.cancelled() => {
1941 tracing::debug!("[layer] Supervisor: shutdown during reconnect, exiting.");
1942 return;
1943 }
1944 _ = sleep(Duration::from_millis(delay_ms)) => {}
1945 }
1946
1947 let dummy_ak = [0u8; 256];
1952 let dummy_fk = FrameKind::Abridged;
1953 match self.do_reconnect(&dummy_ak, &dummy_fk).await {
1954 Ok(conn) => break conn,
1955 Err(e) => {
1956 tracing::warn!("[layer] Supervisor: reconnect failed ({e})");
1957 let next = (delay_ms * 2).min(RECONNECT_MAX_SECS * 1_000);
1958 delay_ms = jitter_delay(next).as_millis() as u64;
1959 }
1960 }
1961 };
1962
1963 let (new_rh, new_fk, new_ak, new_sid) = new_conn;
1964 rh = new_rh;
1965 fk = new_fk;
1966 ak = new_ak;
1967 sid = new_sid;
1968
1969 let (init_tx, init_rx) = oneshot::channel();
1972 let c = self.clone();
1973 let utx = self.inner.update_tx.clone();
1974 tokio::spawn(async move {
1975 let result = loop {
1977 match c.init_connection().await {
1978 Ok(()) => break Ok(()),
1979 Err(InvocationError::Rpc(ref r)) if r.flood_wait_seconds().is_some() => {
1980 let secs = r.flood_wait_seconds().unwrap();
1981 tracing::warn!(
1982 "[layer] Supervisor init_connection FLOOD_WAIT_{secs}: waiting"
1983 );
1984 sleep(Duration::from_secs(secs + 1)).await;
1985 }
1986 Err(e) => break Err(e),
1987 }
1988 };
1989 if result.is_ok() {
1990 let missed = {
1996 let mut attempt = 0u32;
1997 const MAX_ATTEMPTS: u32 = 5;
1998 loop {
1999 match c.get_difference().await {
2000 Ok(updates) => break updates,
2001 Err(ref e)
2002 if matches!(e,
2003 InvocationError::Rpc(r) if r.code == 401)
2004 && attempt < MAX_ATTEMPTS =>
2005 {
2006 let delay = Duration::from_millis(500 * (1u64 << attempt));
2007 tracing::warn!(
2008 "[layer] getDifference AUTH_KEY_UNREGISTERED \
2009 (attempt {}/{MAX_ATTEMPTS}): retrying in {delay:?}",
2010 attempt + 1,
2011 );
2012 sleep(delay).await;
2013 attempt += 1;
2014 }
2015 Err(ref e)
2016 if matches!(e,
2017 InvocationError::Rpc(r) if r.code == 401) =>
2018 {
2019 tracing::warn!(
2020 "[layer] getDifference AUTH_KEY_UNREGISTERED \
2021 after {MAX_ATTEMPTS} retries: falling back to \
2022 sync_pts_state"
2023 );
2024 let _ = c.sync_pts_state().await;
2025 break vec![];
2026 }
2027 Err(e) => {
2028 tracing::warn!(
2029 "[layer] getDifference failed after reconnect: {e}"
2030 );
2031 break vec![];
2032 }
2033 }
2034 }
2035 };
2036 for u in missed {
2037 if utx.try_send(u).is_err() {
2038 tracing::warn!("[layer] update channel full: dropping catch-up update");
2039 break;
2040 }
2041 }
2042 }
2043 let _ = init_tx.send(result);
2044 });
2045 restart_init_rx = Some(init_rx);
2046
2047 tracing::debug!(
2048 "[layer] Supervisor: restarting reader loop (restart #{restart_count}) …"
2049 );
2050 }
2052 }
2053
2054 #[allow(clippy::too_many_arguments)]
2055 async fn reader_loop(
2056 &self,
2057 mut rh: OwnedReadHalf,
2058 mut fk: FrameKind,
2059 mut ak: [u8; 256],
2060 mut sid: i64,
2061 initial_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>>,
2064 new_conn_rx: &mut mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
2065 network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
2066 ) {
2067 let mut init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = initial_init_rx;
2072 let mut init_fail_count: u32 = 0;
2077
2078 let mut gap_tick = tokio::time::interval(std::time::Duration::from_millis(1500));
2079 gap_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2080
2081 let mut restart_interval = self.inner.restart_policy.restart_interval().map(|d| {
2082 let mut i = tokio::time::interval(d);
2083 i.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2084 i
2085 });
2086 if let Some(ref mut i) = restart_interval {
2087 i.tick().await;
2088 }
2089
2090 loop {
2091 tokio::select! {
2092 _ = gap_tick.tick() => {
2096 if self.inner.possible_gap.lock().await.has_global() {
2101 let gap_expired = self.inner.possible_gap.lock().await.global_deadline_elapsed();
2102 if gap_expired {
2103 let c = self.clone();
2104 tokio::spawn(async move {
2105 if let Err(e) = c.check_update_deadline().await {
2106 tracing::warn!("[layer] gap tick getDifference: {e}");
2107 }
2108 });
2109 }
2110 }
2111 }
2112 _ = async {
2113 if let Some(ref mut i) = restart_interval { i.tick().await; }
2114 else { std::future::pending::<()>().await; }
2115 } => {
2116 tracing::info!("[layer] scheduled restart: reconnecting");
2117 let _ = self.inner.write_half.lock().await.shutdown().await;
2118 let _ = self.inner.network_hint_tx.send(());
2119 }
2120 outcome = recv_frame_with_keepalive(&mut rh, &fk, self, &ak) => {
2122 match outcome {
2123 FrameOutcome::Frame(mut raw) => {
2124 let time_offset = self.inner.writer.lock().await.enc.time_offset;
2129 let msg = match EncryptedSession::decrypt_frame_with_offset(
2130 &ak, sid, &mut raw, time_offset,
2131 ) {
2132 Ok(m) => m,
2133 Err(e) => {
2134 tracing::warn!("[layer] Decrypt error: {e:?}: failing pending waiters and reconnecting");
2140 drop(init_rx.take());
2141 {
2142 let mut pending = self.inner.pending.lock().await;
2143 let msg = format!("decrypt error: {e}");
2144 for (_, tx) in pending.drain() {
2145 let _ = tx.send(Err(InvocationError::Io(
2146 std::io::Error::new(
2147 std::io::ErrorKind::InvalidData,
2148 msg.clone(),
2149 )
2150 )));
2151 }
2152 }
2153 self.inner.writer.lock().await.sent_bodies.clear();
2154 match self.do_reconnect_loop(
2155 RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
2156 network_hint_rx,
2157 ).await {
2158 Some(rx) => { init_rx = Some(rx); }
2159 None => return,
2160 }
2161 continue;
2162 }
2163 };
2164
2165 if msg.bad_time {
2172 let mut w = self.inner.writer.lock().await;
2173 w.enc.correct_time_offset(msg.msg_id);
2174 tracing::debug!(
2175 "[layer] bad_time: server msg_id={} outside ±300 s window; \
2176 corrected time_offset to {} s",
2177 msg.msg_id,
2178 w.enc.time_offset
2179 );
2180 }
2181 self.route_frame(msg.body, msg.msg_id).await;
2186
2187 }
2192
2193 FrameOutcome::Error(e) => {
2194 tracing::warn!("[layer] Reader: connection error: {e}");
2195 drop(init_rx.take()); if let InvocationError::Rpc(r) = &e {
2200 match r.code {
2201 -429 => tracing::error!(
2202 "[layer] transport: -429 flood code - \
2203 Telegram is rate-limiting this connection"
2204 ),
2205 -444 => {
2206 tracing::error!(
2212 "[layer] transport: -444 bad dc_id - \
2213 DC options may be stale, resetting address \
2214 to trigger config refresh on reconnect"
2215 );
2216 let home_dc_id =
2217 *self.inner.home_dc_id.lock().await;
2218 let fallback =
2219 crate::dc_migration::fallback_dc_addr(home_dc_id)
2220 .to_string();
2221 let mut opts =
2222 self.inner.dc_options.lock().await;
2223 if let Some(entry) = opts.get_mut(&home_dc_id) {
2224 tracing::warn!(
2225 "[layer] -444: resetting DC{home_dc_id} \
2226 addr → {fallback}"
2227 );
2228 entry.addr = fallback;
2229 }
2230 }
2231 _ => {}
2232 }
2233 }
2234
2235 let key_is_stale = match &e {
2244 InvocationError::Rpc(r) if r.code == -404 => true,
2245 _ => false,
2247 };
2248 let clear_key = key_is_stale
2252 && self.inner.dh_in_progress
2253 .compare_exchange(false, true,
2254 std::sync::atomic::Ordering::SeqCst,
2255 std::sync::atomic::Ordering::SeqCst)
2256 .is_ok();
2257 if clear_key {
2258 let home_dc_id = *self.inner.home_dc_id.lock().await;
2259 let mut opts = self.inner.dc_options.lock().await;
2260 if let Some(entry) = opts.get_mut(&home_dc_id) {
2261 tracing::warn!(
2262 "[layer] Stale auth key on DC{home_dc_id} ({e}) \
2263 : clearing for fresh DH"
2264 );
2265 entry.auth_key = None;
2266 }
2267 }
2268
2269 {
2272 let mut pending = self.inner.pending.lock().await;
2273 let msg = e.to_string();
2274 for (_, tx) in pending.drain() {
2275 let _ = tx.send(Err(InvocationError::Io(
2276 std::io::Error::new(
2277 std::io::ErrorKind::ConnectionReset, msg.clone()))));
2278 }
2279 }
2280 self.inner.writer.lock().await.sent_bodies.clear();
2282
2283 let reconnect_delay = if clear_key { 0 } else { RECONNECT_BASE_MS };
2286 match self.do_reconnect_loop(
2287 reconnect_delay, &mut rh, &mut fk, &mut ak, &mut sid,
2288 network_hint_rx,
2289 ).await {
2290 Some(rx) => {
2291 self.inner.dh_in_progress
2294 .store(false, std::sync::atomic::Ordering::SeqCst);
2295 init_rx = Some(rx);
2296 }
2297 None => {
2298 self.inner.dh_in_progress
2299 .store(false, std::sync::atomic::Ordering::SeqCst);
2300 return; }
2302 }
2303 }
2304
2305 FrameOutcome::Keepalive => {
2306 let c = self.clone();
2310 tokio::spawn(async move {
2311 if let Err(e) = c.check_update_deadline().await {
2312 tracing::warn!("[layer] check_update_deadline: {e}");
2313 }
2314 });
2315 }
2316 }
2317 }
2318
2319 maybe = new_conn_rx.recv() => {
2321 if let Some((new_rh, new_fk, new_ak, new_sid)) = maybe {
2322 rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
2323 tracing::debug!("[layer] Reader: switched to new connection.");
2324 } else {
2325 break; }
2327 }
2328
2329
2330 init_result = async { init_rx.as_mut().unwrap().await }, if init_rx.is_some() => {
2332 init_rx = None;
2333 match init_result {
2334 Ok(Ok(())) => {
2335 init_fail_count = 0;
2336 tracing::info!("[layer] Reconnected to Telegram ✓: session live, replaying missed updates …");
2345 }
2346
2347 Ok(Err(e)) => {
2348 let key_is_stale = matches!(&e, InvocationError::Rpc(r) if r.code == -404);
2353 let dh_claimed = key_is_stale
2355 && self.inner.dh_in_progress
2356 .compare_exchange(false, true,
2357 std::sync::atomic::Ordering::SeqCst,
2358 std::sync::atomic::Ordering::SeqCst)
2359 .is_ok();
2360
2361 if dh_claimed {
2362 tracing::warn!(
2363 "[layer] init_connection: definitive bad-key ({e}) \
2364 : clearing auth key for fresh DH …"
2365 );
2366 init_fail_count = 0;
2367 let home_dc_id = *self.inner.home_dc_id.lock().await;
2368 let mut opts = self.inner.dc_options.lock().await;
2369 if let Some(entry) = opts.get_mut(&home_dc_id) {
2370 entry.auth_key = None;
2371 }
2372 } else {
2374 init_fail_count += 1;
2375 tracing::warn!(
2376 "[layer] init_connection failed (attempt {init_fail_count}, {e}) \
2377 : retrying with same key …"
2378 );
2379 }
2380 {
2381 let mut pending = self.inner.pending.lock().await;
2382 let msg = e.to_string();
2383 for (_, tx) in pending.drain() {
2384 let _ = tx.send(Err(InvocationError::Io(
2385 std::io::Error::new(
2386 std::io::ErrorKind::ConnectionReset, msg.clone()))));
2387 }
2388 }
2389 match self.do_reconnect_loop(
2390 0, &mut rh, &mut fk, &mut ak, &mut sid, network_hint_rx,
2391 ).await {
2392 Some(rx) => { init_rx = Some(rx); }
2393 None => return,
2394 }
2395 }
2396
2397 Err(_) => {
2398 tracing::warn!("[layer] init_connection task dropped unexpectedly, reconnecting …");
2400 match self.do_reconnect_loop(
2401 RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
2402 network_hint_rx,
2403 ).await {
2404 Some(rx) => { init_rx = Some(rx); }
2405 None => return,
2406 }
2407 }
2408 }
2409 }
2410 }
2411 }
2412 }
2413
2414 async fn route_frame(&self, body: Vec<u8>, msg_id: i64) {
2416 if body.len() < 4 {
2417 return;
2418 }
2419 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
2420
2421 match cid {
2422 ID_RPC_RESULT => {
2423 if body.len() < 12 {
2424 return;
2425 }
2426 let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2427 let inner = body[12..].to_vec();
2428 self.inner.writer.lock().await.pending_ack.push(msg_id);
2430 let result = unwrap_envelope(inner);
2431 if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
2432 self.inner
2434 .writer
2435 .lock()
2436 .await
2437 .sent_bodies
2438 .remove(&req_msg_id);
2439 self.inner
2441 .writer
2442 .lock()
2443 .await
2444 .container_map
2445 .retain(|_, inner| *inner != req_msg_id);
2446 let to_send = match result {
2447 Ok(EnvelopeResult::Payload(p)) => Ok(p),
2448 Ok(EnvelopeResult::RawUpdates(bodies)) => {
2449 let c = self.clone();
2454 tokio::spawn(async move {
2455 for body in bodies {
2456 c.dispatch_updates(&body).await;
2457 }
2458 });
2459 Ok(vec![])
2460 }
2461 Ok(EnvelopeResult::Pts(pts, pts_count)) => {
2462 let c = self.clone();
2464 tokio::spawn(async move {
2465 match c.check_and_fill_gap(pts, pts_count, None).await {
2466 Ok(replayed) => {
2467 for u in replayed {
2469 let _ = c.inner.update_tx.try_send(u);
2470 }
2471 }
2472 Err(e) => tracing::warn!(
2473 "[layer] updateShortSentMessage pts advance: {e}"
2474 ),
2475 }
2476 });
2477 Ok(vec![])
2478 }
2479 Ok(EnvelopeResult::None) => Ok(vec![]),
2480 Err(e) => {
2481 tracing::debug!(
2482 "[layer] rpc_result deserialize failure for msg_id={req_msg_id}: {e}"
2483 );
2484 Err(e)
2485 }
2486 };
2487 let _ = tx.send(to_send);
2488 }
2489 }
2490 ID_RPC_ERROR => {
2491 tracing::warn!("[layer] Unexpected top-level rpc_error (no pending target)");
2492 }
2493 ID_MSG_CONTAINER => {
2494 if body.len() < 8 {
2495 return;
2496 }
2497 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
2498 let mut pos = 8usize;
2499 for _ in 0..count {
2500 if pos + 16 > body.len() {
2501 break;
2502 }
2503 let inner_msg_id = i64::from_le_bytes(body[pos..pos + 8].try_into().unwrap());
2505 let inner_len =
2506 u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
2507 pos += 16;
2508 if pos + inner_len > body.len() {
2509 break;
2510 }
2511 let inner = body[pos..pos + inner_len].to_vec();
2512 pos += inner_len;
2513 Box::pin(self.route_frame(inner, inner_msg_id)).await;
2514 }
2515 }
2516 ID_GZIP_PACKED => {
2517 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
2518 if let Ok(inflated) = gz_inflate(&bytes) {
2519 Box::pin(self.route_frame(inflated, msg_id)).await;
2521 }
2522 }
2523 ID_BAD_SERVER_SALT => {
2524 if body.len() >= 28 {
2531 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2532 let new_salt = i64::from_le_bytes(body[20..28].try_into().unwrap());
2533
2534 {
2537 let mut w = self.inner.writer.lock().await;
2538 w.salts.clear();
2539 w.salts.push(FutureSalt {
2540 valid_since: 0,
2541 valid_until: i32::MAX,
2542 salt: new_salt,
2543 });
2544 w.enc.salt = new_salt;
2545 }
2546 tracing::debug!(
2547 "[layer] bad_server_salt: bad_msg_id={bad_msg_id} new_salt={new_salt:#x}"
2548 );
2549
2550 {
2555 let mut w = self.inner.writer.lock().await;
2556
2557 let resolved_id = if w.sent_bodies.contains_key(&bad_msg_id) {
2559 bad_msg_id
2560 } else if let Some(&inner_id) = w.container_map.get(&bad_msg_id) {
2561 w.container_map.remove(&bad_msg_id);
2562 inner_id
2563 } else {
2564 bad_msg_id };
2566
2567 if let Some(orig_body) = w.sent_bodies.remove(&resolved_id) {
2568 let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
2569 let fk = w.frame_kind.clone();
2570 drop(w);
2573 let mut pending = self.inner.pending.lock().await;
2574 if let Some(tx) = pending.remove(&resolved_id) {
2575 pending.insert(new_msg_id, tx);
2576 drop(pending);
2577 if let Err(e) = send_frame_write(
2578 &mut *self.inner.write_half.lock().await,
2579 &wire,
2580 &fk,
2581 )
2582 .await
2583 {
2584 tracing::warn!("[layer] bad_server_salt re-send failed: {e}");
2585 } else {
2586 tracing::debug!(
2587 "[layer] bad_server_salt re-sent \
2588 {resolved_id}→{new_msg_id}"
2589 );
2590 }
2591 }
2592 } else {
2593 drop(w);
2596 if let Some(tx) = self.inner.pending.lock().await.remove(&bad_msg_id) {
2597 let _ = tx.send(Err(InvocationError::Io(std::io::Error::new(
2598 std::io::ErrorKind::InvalidData,
2599 "bad_server_salt on re-sent message; caller should retry",
2600 ))));
2601 }
2602 }
2603 }
2604
2605 self.spawn_salt_fetch_if_needed();
2607
2608 let remaining: Vec<(i64, Vec<u8>)> = {
2617 let mut w = self.inner.writer.lock().await;
2618 let keys: Vec<i64> = w.sent_bodies.keys().copied().collect();
2619 keys.into_iter()
2620 .filter_map(|k| w.sent_bodies.remove(&k).map(|v| (k, v)))
2621 .collect()
2622 };
2623
2624 for (old_id, body) in remaining {
2625 let (wire, new_id, fk) = {
2626 let mut w = self.inner.writer.lock().await;
2627 let (wire, new_id) = w.enc.pack_body_with_msg_id(&body, true);
2628 let fk = w.frame_kind.clone();
2629 w.sent_bodies.insert(new_id, body);
2631 (wire, new_id, fk)
2632 };
2633 let has_waiter = {
2634 let mut pending = self.inner.pending.lock().await;
2635 if let Some(tx) = pending.remove(&old_id) {
2636 pending.insert(new_id, tx);
2637 true
2638 } else {
2639 false
2640 }
2641 };
2642 if has_waiter {
2643 if let Err(e) = send_frame_write(
2644 &mut *self.inner.write_half.lock().await,
2645 &wire,
2646 &fk,
2647 )
2648 .await
2649 {
2650 tracing::warn!(
2651 "[layer] bad_server_salt resendAll: \
2652 re-send {old_id}→{new_id} failed: {e}"
2653 );
2654 self.inner.writer.lock().await.sent_bodies.remove(&new_id);
2655 } else {
2656 tracing::debug!(
2657 "[layer] bad_server_salt resendAll: \
2658 re-sent {old_id}→{new_id}"
2659 );
2660 }
2661 } else {
2662 self.inner.writer.lock().await.sent_bodies.remove(&new_id);
2664 }
2665 }
2666 }
2667 }
2668 ID_PONG => {
2669 if body.len() >= 20 {
2675 let ping_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2676 self.inner.writer.lock().await.pending_ack.push(msg_id);
2678 if let Some(tx) = self.inner.pending.lock().await.remove(&ping_msg_id) {
2679 let mut w = self.inner.writer.lock().await;
2680 w.sent_bodies.remove(&ping_msg_id);
2681 w.container_map.retain(|_, inner| *inner != ping_msg_id);
2682 drop(w);
2683 let _ = tx.send(Ok(body));
2684 }
2685 }
2686 }
2687 ID_FUTURE_SALTS => {
2689 self.inner.writer.lock().await.pending_ack.push(msg_id);
2703
2704 if body.len() >= 24 {
2705 let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2706 let server_now = i32::from_le_bytes(body[12..16].try_into().unwrap());
2707 let count = u32::from_le_bytes(body[20..24].try_into().unwrap()) as usize;
2708
2709 let mut new_salts: Vec<FutureSalt> = Vec::with_capacity(count.clamp(0, 4096));
2712 for i in 0..count {
2713 let base = 24 + i * 16;
2714 if base + 16 > body.len() {
2715 break;
2716 }
2717 new_salts.push(FutureSalt {
2723 valid_since: i32::from_le_bytes(
2724 body[base..base + 4].try_into().unwrap(),
2725 ),
2726 salt: i64::from_le_bytes(body[base + 4..base + 12].try_into().unwrap()),
2727 valid_until: i32::from_le_bytes(
2728 body[base + 12..base + 16].try_into().unwrap(),
2729 ),
2730 });
2731 }
2732
2733 if !new_salts.is_empty() {
2734 new_salts.sort_by_key(|s| s.valid_since);
2737 let mut w = self.inner.writer.lock().await;
2738 w.salts = new_salts;
2739 w.start_salt_time = Some((server_now, std::time::Instant::now()));
2740
2741 let use_salt = w
2746 .salts
2747 .iter()
2748 .rev()
2749 .find(|s| s.valid_since + SALT_USE_DELAY <= server_now)
2750 .or_else(|| w.salts.first())
2751 .map(|s| s.salt);
2752 if let Some(salt) = use_salt {
2753 w.enc.salt = salt;
2754 tracing::debug!(
2755 "[layer] FutureSalts: stored {} salts, \
2756 active salt={salt:#x}",
2757 w.salts.len()
2758 );
2759 }
2760 }
2761
2762 if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
2763 let mut w = self.inner.writer.lock().await;
2764 w.sent_bodies.remove(&req_msg_id);
2765 w.container_map.retain(|_, inner| *inner != req_msg_id);
2766 drop(w);
2767 let _ = tx.send(Ok(body));
2768 }
2769 }
2770 }
2771 ID_NEW_SESSION => {
2772 if body.len() >= 28 {
2777 let server_salt = i64::from_le_bytes(body[20..28].try_into().unwrap());
2778 let mut w = self.inner.writer.lock().await;
2779 w.pending_ack.push(msg_id);
2781 w.salts.clear();
2784 w.salts.push(FutureSalt {
2785 valid_since: 0,
2786 valid_until: i32::MAX,
2787 salt: server_salt,
2788 });
2789 w.enc.salt = server_salt;
2790 tracing::debug!(
2791 "[layer] new_session_created: salt pool reset to {server_salt:#x}"
2792 );
2793 }
2794 }
2795 ID_BAD_MSG_NOTIFY => {
2797 if body.len() < 20 {
2799 return;
2800 }
2801 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2802 let error_code = u32::from_le_bytes(body[16..20].try_into().unwrap());
2803
2804 let description = match error_code {
2806 16 => "msg_id too low",
2807 17 => "msg_id too high",
2808 18 => "incorrect two lower order msg_id bits (bug)",
2809 19 => "container msg_id is same as previously received (bug)",
2810 20 => "message too old",
2811 32 => "msg_seqno too low",
2812 33 => "msg_seqno too high",
2813 34 => "even msg_seqno expected (bug)",
2814 35 => "odd msg_seqno expected (bug)",
2815 48 => "incorrect server salt",
2816 64 => "invalid container (bug)",
2817 _ => "unknown bad_msg code",
2818 };
2819
2820 if error_code == 32 || error_code == 33 {
2831 tracing::warn!(
2832 "[layer] bad_msg_notification: seq_no desync \
2833 (code={error_code}: {description}, bad_msg_id={bad_msg_id}); \
2834 performing session reset (new session_id + reconnect)"
2835 );
2836 self.inner
2838 .writer
2839 .lock()
2840 .await
2841 .enc
2842 .correct_seq_no(error_code);
2843 self.inner.writer.lock().await.sent_bodies.clear();
2846 {
2849 let mut pending = self.inner.pending.lock().await;
2850 for (_, tx) in pending.drain() {
2851 let _ = tx.send(Err(InvocationError::Io(std::io::Error::new(
2852 std::io::ErrorKind::ConnectionReset,
2853 format!(
2854 "session reset: bad_msg_notification \
2855 code={error_code} (seq_no desync)"
2856 ),
2857 ))));
2858 }
2859 }
2860 let _ = self.inner.write_half.lock().await.shutdown().await;
2864 return;
2865 }
2866
2867 let retryable = matches!(error_code, 16 | 17 | 48);
2868 let fatal = !retryable;
2869
2870 if fatal {
2871 tracing::error!(
2872 "[layer] bad_msg_notification (fatal): bad_msg_id={bad_msg_id} \
2873 code={error_code}: {description}"
2874 );
2875 } else {
2876 tracing::warn!(
2877 "[layer] bad_msg_notification: bad_msg_id={bad_msg_id} \
2878 code={error_code}: {description}"
2879 );
2880 }
2881
2882 let resend: Option<(Vec<u8>, i64, i64, FrameKind)> = {
2886 let mut w = self.inner.writer.lock().await;
2887
2888 if error_code == 16 || error_code == 17 {
2890 w.enc.correct_time_offset(msg_id);
2891 }
2892
2893 if retryable {
2894 let resolved_id = if w.sent_bodies.contains_key(&bad_msg_id) {
2898 bad_msg_id
2899 } else if let Some(&inner_id) = w.container_map.get(&bad_msg_id) {
2900 w.container_map.remove(&bad_msg_id);
2901 inner_id
2902 } else {
2903 bad_msg_id
2904 };
2905
2906 if let Some(orig_body) = w.sent_bodies.remove(&resolved_id) {
2907 let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
2908 let fk = w.frame_kind.clone();
2909 w.sent_bodies.insert(new_msg_id, orig_body);
2910 Some((wire, resolved_id, new_msg_id, fk))
2912 } else {
2913 None
2914 }
2915 } else {
2916 w.sent_bodies.remove(&bad_msg_id);
2918 if let Some(&inner_id) = w.container_map.get(&bad_msg_id) {
2919 w.sent_bodies.remove(&inner_id);
2920 w.container_map.remove(&bad_msg_id);
2921 }
2922 None
2923 }
2924 }; match resend {
2927 Some((wire, old_msg_id, new_msg_id, fk)) => {
2928 let has_waiter = {
2930 let mut pending = self.inner.pending.lock().await;
2931 if let Some(tx) = pending.remove(&old_msg_id) {
2932 pending.insert(new_msg_id, tx);
2933 true
2934 } else {
2935 false
2936 }
2937 };
2938 if has_waiter {
2939 if let Err(e) = send_frame_write(
2941 &mut *self.inner.write_half.lock().await,
2942 &wire,
2943 &fk,
2944 )
2945 .await
2946 {
2947 tracing::warn!("[layer] re-send failed: {e}");
2948 self.inner
2949 .writer
2950 .lock()
2951 .await
2952 .sent_bodies
2953 .remove(&new_msg_id);
2954 } else {
2955 tracing::debug!("[layer] re-sent {old_msg_id}→{new_msg_id}");
2956 }
2957 } else {
2958 self.inner
2959 .writer
2960 .lock()
2961 .await
2962 .sent_bodies
2963 .remove(&new_msg_id);
2964 }
2965 }
2966 None => {
2967 if let Some(tx) = self.inner.pending.lock().await.remove(&bad_msg_id) {
2969 let _ = tx.send(Err(InvocationError::Deserialize(format!(
2970 "bad_msg_notification code={error_code} ({description})"
2971 ))));
2972 }
2973 }
2974 }
2975 }
2976 ID_MSG_DETAILED_INFO => {
2978 if body.len() >= 20 {
2982 let answer_msg_id = i64::from_le_bytes(body[12..20].try_into().unwrap());
2983 self.inner
2984 .writer
2985 .lock()
2986 .await
2987 .pending_ack
2988 .push(answer_msg_id);
2989 tracing::trace!(
2990 "[layer] MsgDetailedInfo: queued ack for answer_msg_id={answer_msg_id}"
2991 );
2992 }
2993 }
2994 ID_MSG_NEW_DETAIL_INFO => {
2995 if body.len() >= 12 {
2998 let answer_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2999 self.inner
3000 .writer
3001 .lock()
3002 .await
3003 .pending_ack
3004 .push(answer_msg_id);
3005 tracing::trace!("[layer] MsgNewDetailedInfo: queued ack for {answer_msg_id}");
3006 }
3007 }
3008 ID_MSG_RESEND_REQ => {
3010 if body.len() >= 12 {
3015 let count = u32::from_le_bytes(body[8..12].try_into().unwrap()) as usize;
3016 let mut resends: Vec<(Vec<u8>, i64, i64)> = Vec::new();
3017 {
3018 let mut w = self.inner.writer.lock().await;
3019 let fk = w.frame_kind.clone();
3020 for i in 0..count {
3021 let off = 12 + i * 8;
3022 if off + 8 > body.len() {
3023 break;
3024 }
3025 let resend_id =
3026 i64::from_le_bytes(body[off..off + 8].try_into().unwrap());
3027 if let Some(orig_body) = w.sent_bodies.remove(&resend_id) {
3028 let (wire, new_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
3029 let mut pending = self.inner.pending.lock().await;
3030 if let Some(tx) = pending.remove(&resend_id) {
3031 pending.insert(new_id, tx);
3032 }
3033 drop(pending);
3034 w.sent_bodies.insert(new_id, orig_body);
3035 resends.push((wire, resend_id, new_id));
3036 }
3037 }
3038 let _ = fk; }
3040 let fk = self.inner.writer.lock().await.frame_kind.clone();
3042 for (wire, resend_id, new_id) in resends {
3043 send_frame_write(&mut *self.inner.write_half.lock().await, &wire, &fk)
3044 .await
3045 .ok();
3046 tracing::debug!("[layer] MsgResendReq: resent {resend_id} → {new_id}");
3047 }
3048 }
3049 }
3050 0xe22045fc => {
3052 tracing::warn!("[layer] destroy_session_ok received: session terminated by server");
3053 }
3054 0x62d350c9 => {
3055 tracing::warn!("[layer] destroy_session_none received: session was already gone");
3056 }
3057 ID_UPDATES
3058 | ID_UPDATE_SHORT
3059 | ID_UPDATES_COMBINED
3060 | ID_UPDATE_SHORT_MSG
3061 | ID_UPDATE_SHORT_CHAT_MSG
3062 | ID_UPDATE_SHORT_SENT_MSG
3063 | ID_UPDATES_TOO_LONG => {
3064 self.inner.writer.lock().await.pending_ack.push(msg_id);
3066 self.dispatch_updates(&body).await;
3068 }
3069 _ => {}
3070 }
3071 }
3072
3073 fn update_sort_key(upd: &tl::enums::Update) -> i32 {
3083 use tl::enums::Update::*;
3084 match upd {
3085 NewMessage(u) => u.pts - u.pts_count,
3086 EditMessage(u) => u.pts - u.pts_count,
3087 DeleteMessages(u) => u.pts - u.pts_count,
3088 ReadHistoryInbox(u) => u.pts - u.pts_count,
3089 ReadHistoryOutbox(u) => u.pts - u.pts_count,
3090 NewChannelMessage(u) => u.pts - u.pts_count,
3091 EditChannelMessage(u) => u.pts - u.pts_count,
3092 DeleteChannelMessages(u) => u.pts - u.pts_count,
3093 _ => 0,
3094 }
3095 }
3096
3097 async fn dispatch_updates(&self, body: &[u8]) {
3102 if body.len() < 4 {
3103 return;
3104 }
3105 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
3106
3107 if cid == 0xe317af7e_u32 {
3109 tracing::warn!("[layer] updatesTooLong: getDifference");
3110 let c = self.clone();
3111 let utx = self.inner.update_tx.clone();
3112 tokio::spawn(async move {
3113 match c.get_difference().await {
3114 Ok(updates) => {
3115 for u in updates {
3116 if utx.try_send(u).is_err() {
3117 tracing::warn!("[layer] update channel full: dropping update");
3118 break;
3119 }
3120 }
3121 }
3122 Err(e) => tracing::warn!("[layer] getDifference after updatesTooLong: {e}"),
3123 }
3124 });
3125 return;
3126 }
3127
3128 if cid == 0x313bc7f8 {
3137 let mut cur = Cursor::from_slice(&body[4..]);
3139 let m = match tl::types::UpdateShortMessage::deserialize(&mut cur) {
3140 Ok(m) => m,
3141 Err(e) => {
3142 tracing::debug!("[layer] updateShortMessage deserialize error: {e}");
3143 return;
3144 }
3145 };
3146 let pts = m.pts;
3147 let pts_count = m.pts_count;
3148 let upd = update::Update::NewMessage(update::make_short_dm(m));
3149 let c = self.clone();
3150 let utx = self.inner.update_tx.clone();
3151 tokio::spawn(async move {
3152 match c
3153 .check_and_fill_gap(pts, pts_count, Some(attach_client_to_update(upd, &c)))
3154 .await
3155 {
3156 Ok(updates) => {
3157 for u in updates {
3158 if utx.try_send(u).is_err() {
3159 tracing::warn!("[layer] update channel full: dropping update");
3160 }
3161 }
3162 }
3163 Err(e) => tracing::warn!("[layer] updateShortMessage gap fill: {e}"),
3164 }
3165 });
3166 return;
3167 }
3168 if cid == 0x4d6deea5 {
3169 let mut cur = Cursor::from_slice(&body[4..]);
3171 let m = match tl::types::UpdateShortChatMessage::deserialize(&mut cur) {
3172 Ok(m) => m,
3173 Err(e) => {
3174 tracing::debug!("[layer] updateShortChatMessage deserialize error: {e}");
3175 return;
3176 }
3177 };
3178 let pts = m.pts;
3179 let pts_count = m.pts_count;
3180 let upd = update::Update::NewMessage(update::make_short_chat(m));
3181 let c = self.clone();
3182 let utx = self.inner.update_tx.clone();
3183 tokio::spawn(async move {
3184 match c
3185 .check_and_fill_gap(pts, pts_count, Some(attach_client_to_update(upd, &c)))
3186 .await
3187 {
3188 Ok(updates) => {
3189 for u in updates {
3190 if utx.try_send(u).is_err() {
3191 tracing::warn!("[layer] update channel full: dropping update");
3192 }
3193 }
3194 }
3195 Err(e) => tracing::warn!("[layer] updateShortChatMessage gap fill: {e}"),
3196 }
3197 });
3198 return;
3199 }
3200
3201 if cid == ID_UPDATE_SHORT_SENT_MSG {
3205 let mut cur = Cursor::from_slice(&body[4..]);
3206 match tl::types::UpdateShortSentMessage::deserialize(&mut cur) {
3207 Ok(m) => {
3208 let pts = m.pts;
3209 let pts_count = m.pts_count;
3210 tracing::debug!(
3211 "[layer] updateShortSentMessage (push): pts={pts} pts_count={pts_count}: advancing pts"
3212 );
3213 let c = self.clone();
3214 let utx = self.inner.update_tx.clone();
3215 tokio::spawn(async move {
3216 match c.check_and_fill_gap(pts, pts_count, None).await {
3217 Ok(replayed) => {
3218 for u in replayed {
3219 if utx.try_send(u).is_err() {
3220 tracing::warn!(
3221 "[layer] update channel full: dropping update"
3222 );
3223 }
3224 }
3225 }
3226 Err(e) => tracing::warn!(
3227 "[layer] updateShortSentMessage push pts advance: {e}"
3228 ),
3229 }
3230 });
3231 }
3232 Err(e) => {
3233 tracing::debug!("[layer] updateShortSentMessage push deserialize error: {e}")
3234 }
3235 }
3236 return;
3237 }
3238
3239 use crate::pts::PtsCheckResult;
3246 use layer_tl_types::{Cursor, Deserializable};
3247
3248 struct ParsedContainer {
3254 seq_info: Option<(i32, i32)>,
3255 users: Vec<tl::enums::User>,
3256 chats: Vec<tl::enums::Chat>,
3257 updates: Vec<tl::enums::Update>,
3258 }
3259
3260 let mut cur = Cursor::from_slice(body);
3261 let parsed: ParsedContainer = match cid {
3262 0x74ae4240 => {
3263 match tl::enums::Updates::deserialize(&mut cur) {
3265 Ok(tl::enums::Updates::Updates(u)) => ParsedContainer {
3266 seq_info: Some((u.seq, u.seq)),
3267 users: u.users,
3268 chats: u.chats,
3269 updates: u.updates,
3270 },
3271 _ => ParsedContainer {
3272 seq_info: None,
3273 users: vec![],
3274 chats: vec![],
3275 updates: vec![],
3276 },
3277 }
3278 }
3279 0x725b04c3 => {
3280 match tl::enums::Updates::deserialize(&mut cur) {
3282 Ok(tl::enums::Updates::Combined(u)) => ParsedContainer {
3283 seq_info: Some((u.seq, u.seq_start)),
3284 users: u.users,
3285 chats: u.chats,
3286 updates: u.updates,
3287 },
3288 _ => ParsedContainer {
3289 seq_info: None,
3290 users: vec![],
3291 chats: vec![],
3292 updates: vec![],
3293 },
3294 }
3295 }
3296 0x78d4dec1 => {
3297 match tl::types::UpdateShort::deserialize(&mut Cursor::from_slice(body)) {
3299 Ok(u) => ParsedContainer {
3300 seq_info: None,
3301 users: vec![],
3302 chats: vec![],
3303 updates: vec![u.update],
3304 },
3305 Err(_) => ParsedContainer {
3306 seq_info: None,
3307 users: vec![],
3308 chats: vec![],
3309 updates: vec![],
3310 },
3311 }
3312 }
3313 _ => ParsedContainer {
3314 seq_info: None,
3315 users: vec![],
3316 chats: vec![],
3317 updates: vec![],
3318 },
3319 };
3320
3321 if !parsed.users.is_empty() || !parsed.chats.is_empty() {
3323 self.cache_users_and_chats(&parsed.users, &parsed.chats)
3324 .await;
3325 }
3326
3327 if let Some((seq, seq_start)) = parsed.seq_info
3329 && seq != 0
3330 {
3331 let result = self.inner.pts_state.lock().await.check_seq(seq, seq_start);
3332 match result {
3333 PtsCheckResult::Ok => {
3334 }
3336 PtsCheckResult::Duplicate => {
3337 tracing::debug!(
3339 "[layer] seq duplicate (seq={seq}, seq_start={seq_start}): dropping container"
3340 );
3341 return;
3342 }
3343 PtsCheckResult::Gap { expected, got } => {
3344 tracing::warn!(
3347 "[layer] seq gap: expected {expected}, got {got}: getDifference"
3348 );
3349 let c = self.clone();
3350 let utx = self.inner.update_tx.clone();
3351 tokio::spawn(async move {
3352 match c.get_difference().await {
3353 Ok(updates) => {
3354 for u in updates {
3355 if utx.try_send(u).is_err() {
3356 tracing::warn!(
3357 "[layer] update channel full: dropping seq gap update"
3358 );
3359 break;
3360 }
3361 }
3362 }
3363 Err(e) => tracing::warn!("[layer] seq gap fill: {e}"),
3364 }
3365 });
3366 return; }
3368 }
3369 }
3370
3371 let mut raw: Vec<tl::enums::Update> = parsed.updates;
3372
3373 raw.sort_by_key(Self::update_sort_key);
3378
3379 for upd in raw {
3380 self.dispatch_single_update(upd).await;
3381 }
3382
3383 if let Some((seq, _)) = parsed.seq_info
3389 && seq != 0
3390 {
3391 self.inner.pts_state.lock().await.advance_seq(seq);
3392 }
3393 }
3394
3395 async fn dispatch_single_update(&self, upd: tl::enums::Update) {
3398 enum Kind {
3401 GlobalPts {
3402 pts: i32,
3403 pts_count: i32,
3404 carry: bool,
3405 },
3406 ChannelPts {
3407 channel_id: i64,
3408 pts: i32,
3409 pts_count: i32,
3410 carry: bool,
3411 },
3412 Qts {
3413 qts: i32,
3414 },
3415 Passthrough,
3416 }
3417
3418 fn ch_from_msg(msg: &tl::enums::Message) -> i64 {
3419 if let tl::enums::Message::Message(m) = msg
3420 && let tl::enums::Peer::Channel(c) = &m.peer_id
3421 {
3422 return c.channel_id;
3423 }
3424 0
3425 }
3426
3427 let kind = {
3428 use tl::enums::Update::*;
3429 match &upd {
3430 NewMessage(u) => Kind::GlobalPts {
3431 pts: u.pts,
3432 pts_count: u.pts_count,
3433 carry: true,
3434 },
3435 EditMessage(u) => Kind::GlobalPts {
3436 pts: u.pts,
3437 pts_count: u.pts_count,
3438 carry: true,
3439 },
3440 DeleteMessages(u) => Kind::GlobalPts {
3441 pts: u.pts,
3442 pts_count: u.pts_count,
3443 carry: true,
3444 },
3445 ReadHistoryInbox(u) => Kind::GlobalPts {
3446 pts: u.pts,
3447 pts_count: u.pts_count,
3448 carry: false,
3449 },
3450 ReadHistoryOutbox(u) => Kind::GlobalPts {
3451 pts: u.pts,
3452 pts_count: u.pts_count,
3453 carry: false,
3454 },
3455 NewChannelMessage(u) => Kind::ChannelPts {
3456 channel_id: ch_from_msg(&u.message),
3457 pts: u.pts,
3458 pts_count: u.pts_count,
3459 carry: true,
3460 },
3461 EditChannelMessage(u) => Kind::ChannelPts {
3462 channel_id: ch_from_msg(&u.message),
3463 pts: u.pts,
3464 pts_count: u.pts_count,
3465 carry: true,
3466 },
3467 DeleteChannelMessages(u) => Kind::ChannelPts {
3468 channel_id: u.channel_id,
3469 pts: u.pts,
3470 pts_count: u.pts_count,
3471 carry: true,
3472 },
3473 NewEncryptedMessage(u) => Kind::Qts { qts: u.qts },
3474 _ => Kind::Passthrough,
3475 }
3476 };
3477
3478 let high = update::from_single_update_pub(upd);
3479
3480 let to_send: Vec<update::Update> = match kind {
3481 Kind::GlobalPts {
3482 pts,
3483 pts_count,
3484 carry,
3485 } => {
3486 let first = if carry { high.into_iter().next() } else { None };
3487 let c = self.clone();
3491 let utx = self.inner.update_tx.clone();
3492 tokio::spawn(async move {
3493 match c.check_and_fill_gap(pts, pts_count, first).await {
3494 Ok(v) => {
3495 for u in v {
3496 let u = attach_client_to_update(u, &c);
3497 if utx.try_send(u).is_err() {
3498 tracing::warn!("[layer] update channel full: dropping update");
3499 break;
3500 }
3501 }
3502 }
3503 Err(e) => tracing::warn!("[layer] pts gap: {e}"),
3504 }
3505 });
3506 vec![]
3507 }
3508 Kind::ChannelPts {
3509 channel_id,
3510 pts,
3511 pts_count,
3512 carry,
3513 } => {
3514 let first = if carry { high.into_iter().next() } else { None };
3515 if channel_id != 0 {
3516 let c = self.clone();
3518 let utx = self.inner.update_tx.clone();
3519 tokio::spawn(async move {
3520 match c
3521 .check_and_fill_channel_gap(channel_id, pts, pts_count, first)
3522 .await
3523 {
3524 Ok(v) => {
3525 for u in v {
3526 let u = attach_client_to_update(u, &c);
3527 if utx.try_send(u).is_err() {
3528 tracing::warn!(
3529 "[layer] update channel full: dropping update"
3530 );
3531 break;
3532 }
3533 }
3534 }
3535 Err(e) => tracing::warn!("[layer] ch pts gap: {e}"),
3536 }
3537 });
3538 vec![]
3539 } else {
3540 first.into_iter().collect()
3541 }
3542 }
3543 Kind::Qts { qts } => {
3544 let c = self.clone();
3546 tokio::spawn(async move {
3547 if let Err(e) = c.check_and_fill_qts_gap(qts, 1).await {
3548 tracing::warn!("[layer] qts gap: {e}");
3549 }
3550 });
3551 vec![]
3552 }
3553 Kind::Passthrough => high
3554 .into_iter()
3555 .map(|u| match u {
3556 update::Update::NewMessage(msg) => {
3557 update::Update::NewMessage(msg.with_client(self.clone()))
3558 }
3559 update::Update::MessageEdited(msg) => {
3560 update::Update::MessageEdited(msg.with_client(self.clone()))
3561 }
3562 other => other,
3563 })
3564 .collect(),
3565 };
3566
3567 for u in to_send {
3568 if self.inner.update_tx.try_send(u).is_err() {
3569 tracing::warn!("[layer] update channel full: dropping update");
3570 }
3571 }
3572 }
3573
3574 async fn do_reconnect_loop(
3584 &self,
3585 initial_delay_ms: u64,
3586 rh: &mut OwnedReadHalf,
3587 fk: &mut FrameKind,
3588 ak: &mut [u8; 256],
3589 sid: &mut i64,
3590 network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
3591 ) -> Option<oneshot::Receiver<Result<(), InvocationError>>> {
3592 let mut delay_ms = if initial_delay_ms == 0 {
3593 0
3596 } else {
3597 initial_delay_ms.max(RECONNECT_BASE_MS)
3598 };
3599 loop {
3600 tracing::debug!("[layer] Reconnecting in {delay_ms} ms …");
3601 tokio::select! {
3602 _ = sleep(Duration::from_millis(delay_ms)) => {}
3603 hint = network_hint_rx.recv() => {
3604 hint?; tracing::debug!("[layer] Network hint → skipping backoff, reconnecting now");
3606 }
3607 }
3608
3609 match self.do_reconnect(ak, fk).await {
3610 Ok((new_rh, new_fk, new_ak, new_sid)) => {
3611 *rh = new_rh;
3612 *fk = new_fk;
3613 *ak = new_ak;
3614 *sid = new_sid;
3615 tracing::debug!("[layer] TCP reconnected ✓: initialising session …");
3616
3617 let (init_tx, init_rx) = oneshot::channel();
3621 let c = self.clone();
3622 let utx = self.inner.update_tx.clone();
3623 tokio::spawn(async move {
3624 let result = {
3629 let mut attempt = 0u32;
3630 const MAX_ATTEMPTS: u32 = 5;
3631 loop {
3632 match c.init_connection().await {
3633 Ok(()) => break Ok(()),
3634 Err(InvocationError::Rpc(ref r))
3635 if r.flood_wait_seconds().is_some() =>
3636 {
3637 let secs = r.flood_wait_seconds().unwrap();
3638 tracing::warn!(
3639 "[layer] init_connection FLOOD_WAIT_{secs}: \
3640 waiting before retry"
3641 );
3642 sleep(Duration::from_secs(secs + 1)).await;
3643 }
3644 Err(InvocationError::Rpc(ref r))
3645 if r.code == 401 && attempt < MAX_ATTEMPTS =>
3646 {
3647 let delay = Duration::from_millis(500 * (1u64 << attempt));
3648 tracing::warn!(
3649 "[layer] init_connection AUTH_KEY_UNREGISTERED \
3650 (attempt {}/{MAX_ATTEMPTS}): retrying in {delay:?}",
3651 attempt + 1,
3652 );
3653 sleep(delay).await;
3654 attempt += 1;
3655 }
3656 Err(e) => break Err(e),
3657 }
3658 }
3659 };
3660 if result.is_ok() {
3661 let missed = {
3666 let mut attempt = 0u32;
3667 const MAX_ATTEMPTS: u32 = 5;
3668 loop {
3669 match c.get_difference().await {
3670 Ok(updates) => break updates,
3671 Err(ref e)
3672 if matches!(e,
3673 InvocationError::Rpc(r) if r.code == 401)
3674 && attempt < MAX_ATTEMPTS =>
3675 {
3676 let delay =
3677 Duration::from_millis(500 * (1u64 << attempt));
3678 tracing::warn!(
3679 "[layer] getDifference AUTH_KEY_UNREGISTERED \
3680 (attempt {}/{MAX_ATTEMPTS}): retrying in \
3681 {delay:?}",
3682 attempt + 1,
3683 );
3684 sleep(delay).await;
3685 attempt += 1;
3686 }
3687 Err(ref e)
3688 if matches!(e,
3689 InvocationError::Rpc(r) if r.code == 401) =>
3690 {
3691 tracing::warn!(
3692 "[layer] getDifference AUTH_KEY_UNREGISTERED \
3693 after {MAX_ATTEMPTS} retries: falling back \
3694 to sync_pts_state"
3695 );
3696 let _ = c.sync_pts_state().await;
3697 break vec![];
3698 }
3699 Err(e) => {
3700 tracing::warn!(
3701 "[layer] getDifference failed after reconnect: \
3702 {e}"
3703 );
3704 break vec![];
3705 }
3706 }
3707 }
3708 };
3709 for u in missed {
3710 if utx.try_send(attach_client_to_update(u, &c)).is_err() {
3711 tracing::warn!(
3712 "[layer] update channel full: dropping catch-up update"
3713 );
3714 break;
3715 }
3716 }
3717 }
3718 let _ = init_tx.send(result);
3719 });
3720 return Some(init_rx);
3721 }
3722 Err(e) => {
3723 tracing::warn!("[layer] Reconnect attempt failed: {e}");
3724 let next = delay_ms
3728 .saturating_mul(2)
3729 .clamp(RECONNECT_BASE_MS, RECONNECT_MAX_SECS * 1_000);
3730 delay_ms = jitter_delay(next).as_millis() as u64;
3731 }
3732 }
3733 }
3734 }
3735
3736 async fn do_reconnect(
3738 &self,
3739 _old_auth_key: &[u8; 256],
3740 _old_frame_kind: &FrameKind,
3741 ) -> Result<(OwnedReadHalf, FrameKind, [u8; 256], i64), InvocationError> {
3742 let home_dc_id = *self.inner.home_dc_id.lock().await;
3743 let (addr, saved_key, first_salt, time_offset) = {
3744 let opts = self.inner.dc_options.lock().await;
3745 match opts.get(&home_dc_id) {
3746 Some(e) => (e.addr.clone(), e.auth_key, e.first_salt, e.time_offset),
3747 None => (
3748 crate::dc_migration::fallback_dc_addr(home_dc_id).to_string(),
3749 None,
3750 0,
3751 0,
3752 ),
3753 }
3754 };
3755 let socks5 = self.inner.socks5.clone();
3756 let mtproxy = self.inner.mtproxy.clone();
3757 let transport = self.inner.transport.clone();
3758
3759 let new_conn = if let Some(key) = saved_key {
3760 tracing::debug!("[layer] Reconnecting to DC{home_dc_id} with saved key …");
3761 match Connection::connect_with_key(
3762 &addr,
3763 key,
3764 first_salt,
3765 time_offset,
3766 socks5.as_ref(),
3767 &transport,
3768 home_dc_id as i16,
3769 )
3770 .await
3771 {
3772 Ok(c) => c,
3773 Err(e) => {
3774 return Err(e);
3775 }
3776 }
3777 } else {
3778 Connection::connect_raw(
3779 &addr,
3780 socks5.as_ref(),
3781 mtproxy.as_ref(),
3782 &transport,
3783 home_dc_id as i16,
3784 )
3785 .await?
3786 };
3787
3788 let (new_writer, new_wh, new_read, new_fk) = new_conn.into_writer();
3789 let new_ak = new_writer.enc.auth_key_bytes();
3790 let new_sid = new_writer.enc.session_id();
3791 *self.inner.writer.lock().await = new_writer;
3792 *self.inner.write_half.lock().await = new_wh;
3793
3794 self.inner
3802 .salt_request_in_flight
3803 .store(false, std::sync::atomic::Ordering::SeqCst);
3804
3805 {
3810 let mut opts = self.inner.dc_options.lock().await;
3811 if let Some(entry) = opts.get_mut(&home_dc_id) {
3812 entry.auth_key = Some(new_ak);
3813 }
3814 }
3815
3816 Ok((new_read, new_fk, new_ak, new_sid))
3828 }
3829
3830 pub async fn send_message(
3834 &self,
3835 peer: &str,
3836 text: &str,
3837 ) -> Result<update::IncomingMessage, InvocationError> {
3838 let p = self.resolve_peer(peer).await?;
3839 self.send_message_to_peer(p, text).await
3840 }
3841
3842 pub async fn send_message_to_peer(
3847 &self,
3848 peer: impl Into<PeerRef>,
3849 text: &str,
3850 ) -> Result<update::IncomingMessage, InvocationError> {
3851 self.send_message_to_peer_ex(peer, &InputMessage::text(text))
3852 .await
3853 }
3854
3855 pub async fn send_message_to_peer_ex(
3860 &self,
3861 peer: impl Into<PeerRef>,
3862 msg: &InputMessage,
3863 ) -> Result<update::IncomingMessage, InvocationError> {
3864 let peer = peer.into().resolve(self).await?;
3865 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
3866 let schedule = if msg.schedule_once_online {
3867 Some(0x7FFF_FFFEi32)
3868 } else {
3869 msg.schedule_date
3870 };
3871
3872 if let Some(media) = &msg.media {
3874 let req = tl::functions::messages::SendMedia {
3875 silent: msg.silent,
3876 background: msg.background,
3877 clear_draft: msg.clear_draft,
3878 noforwards: false,
3879 update_stickersets_order: false,
3880 invert_media: msg.invert_media,
3881 allow_paid_floodskip: false,
3882 peer: input_peer,
3883 reply_to: msg.reply_header(),
3884 media: media.clone(),
3885 message: msg.text.clone(),
3886 random_id: random_i64(),
3887 reply_markup: msg.reply_markup.clone(),
3888 entities: msg.entities.clone(),
3889 schedule_date: schedule,
3890 schedule_repeat_period: None,
3891 send_as: None,
3892 quick_reply_shortcut: None,
3893 effect: None,
3894 allow_paid_stars: None,
3895 suggested_post: None,
3896 };
3897 let body = self.rpc_call_raw_pub(&req).await?;
3898 return Ok(self.extract_sent_message(&body, msg, &peer));
3899 }
3900
3901 let req = tl::functions::messages::SendMessage {
3902 no_webpage: msg.no_webpage,
3903 silent: msg.silent,
3904 background: msg.background,
3905 clear_draft: msg.clear_draft,
3906 noforwards: false,
3907 update_stickersets_order: false,
3908 invert_media: msg.invert_media,
3909 allow_paid_floodskip: false,
3910 peer: input_peer,
3911 reply_to: msg.reply_header(),
3912 message: msg.text.clone(),
3913 random_id: random_i64(),
3914 reply_markup: msg.reply_markup.clone(),
3915 entities: msg.entities.clone(),
3916 schedule_date: schedule,
3917 schedule_repeat_period: None,
3918 send_as: None,
3919 quick_reply_shortcut: None,
3920 effect: None,
3921 allow_paid_stars: None,
3922 suggested_post: None,
3923 };
3924 let body = self.rpc_call_raw(&req).await?;
3925 Ok(self.extract_sent_message(&body, msg, &peer))
3926 }
3927
3928 pub async fn send_message_with_input_peer(
3932 &self,
3933 input_peer: tl::enums::InputPeer,
3934 msg: &InputMessage,
3935 ) -> Result<(), InvocationError> {
3936 let schedule = if msg.schedule_once_online {
3937 Some(0x7FFF_FFFEi32)
3938 } else {
3939 msg.schedule_date
3940 };
3941
3942 if let Some(media) = &msg.media {
3943 let req = tl::functions::messages::SendMedia {
3944 silent: msg.silent,
3945 background: msg.background,
3946 clear_draft: msg.clear_draft,
3947 noforwards: false,
3948 update_stickersets_order: false,
3949 invert_media: msg.invert_media,
3950 allow_paid_floodskip: false,
3951 peer: input_peer,
3952 reply_to: msg.reply_header(),
3953 media: media.clone(),
3954 message: msg.text.clone(),
3955 random_id: random_i64(),
3956 reply_markup: msg.reply_markup.clone(),
3957 entities: msg.entities.clone(),
3958 schedule_date: schedule,
3959 schedule_repeat_period: None,
3960 send_as: None,
3961 quick_reply_shortcut: None,
3962 effect: None,
3963 allow_paid_stars: None,
3964 suggested_post: None,
3965 };
3966 self.rpc_call_raw_pub(&req).await?;
3967 return Ok(());
3968 }
3969
3970 let req = tl::functions::messages::SendMessage {
3971 no_webpage: msg.no_webpage,
3972 silent: msg.silent,
3973 background: msg.background,
3974 clear_draft: msg.clear_draft,
3975 noforwards: false,
3976 update_stickersets_order: false,
3977 invert_media: msg.invert_media,
3978 allow_paid_floodskip: false,
3979 peer: input_peer,
3980 reply_to: msg.reply_header(),
3981 message: msg.text.clone(),
3982 random_id: random_i64(),
3983 reply_markup: msg.reply_markup.clone(),
3984 entities: msg.entities.clone(),
3985 schedule_date: schedule,
3986 schedule_repeat_period: None,
3987 send_as: None,
3988 quick_reply_shortcut: None,
3989 effect: None,
3990 allow_paid_stars: None,
3991 suggested_post: None,
3992 };
3993 self.rpc_call_raw(&req).await?;
3994 Ok(())
3995 }
3996
3997 fn extract_sent_message(
4001 &self,
4002 body: &[u8],
4003 input: &InputMessage,
4004 peer: &tl::enums::Peer,
4005 ) -> update::IncomingMessage {
4006 if body.len() < 4 {
4007 return self.synthetic_sent(input, peer, 0, 0);
4008 }
4009 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
4010
4011 if cid == 0x74ae4240 || cid == 0x725b04c3 {
4013 let mut cur = Cursor::from_slice(body);
4014 if let Ok(tl::enums::Updates::Updates(u)) = tl::enums::Updates::deserialize(&mut cur) {
4015 for upd in &u.updates {
4016 if let tl::enums::Update::NewMessage(nm) = upd {
4017 return update::IncomingMessage::from_raw(nm.message.clone())
4018 .with_client(self.clone());
4019 }
4020 if let tl::enums::Update::NewChannelMessage(nm) = upd {
4021 return update::IncomingMessage::from_raw(nm.message.clone())
4022 .with_client(self.clone());
4023 }
4024 }
4025 }
4026 if let Ok(tl::enums::Updates::Combined(u)) =
4027 tl::enums::Updates::deserialize(&mut Cursor::from_slice(body))
4028 {
4029 for upd in &u.updates {
4030 if let tl::enums::Update::NewMessage(nm) = upd {
4031 return update::IncomingMessage::from_raw(nm.message.clone())
4032 .with_client(self.clone());
4033 }
4034 if let tl::enums::Update::NewChannelMessage(nm) = upd {
4035 return update::IncomingMessage::from_raw(nm.message.clone())
4036 .with_client(self.clone());
4037 }
4038 }
4039 }
4040 }
4041
4042 if cid == 0x9015e101 {
4045 let mut cur = Cursor::from_slice(&body[4..]);
4046 if let Ok(sent) = tl::types::UpdateShortSentMessage::deserialize(&mut cur) {
4047 return self.synthetic_sent_from_short(sent, input, peer);
4048 }
4049 }
4050
4051 if cid == 0x313bc7f8 {
4053 let mut cur = Cursor::from_slice(&body[4..]);
4054 if let Ok(m) = tl::types::UpdateShortMessage::deserialize(&mut cur) {
4055 let msg = tl::types::Message {
4056 out: m.out,
4057 mentioned: m.mentioned,
4058 media_unread: m.media_unread,
4059 silent: m.silent,
4060 post: false,
4061 from_scheduled: false,
4062 legacy: false,
4063 edit_hide: false,
4064 pinned: false,
4065 noforwards: false,
4066 invert_media: false,
4067 offline: false,
4068 video_processing_pending: false,
4069 paid_suggested_post_stars: false,
4070 paid_suggested_post_ton: false,
4071 id: m.id,
4072 from_id: Some(tl::enums::Peer::User(tl::types::PeerUser {
4073 user_id: m.user_id,
4074 })),
4075 peer_id: tl::enums::Peer::User(tl::types::PeerUser { user_id: m.user_id }),
4076 saved_peer_id: None,
4077 fwd_from: m.fwd_from,
4078 via_bot_id: m.via_bot_id,
4079 via_business_bot_id: None,
4080 reply_to: m.reply_to,
4081 date: m.date,
4082 message: m.message,
4083 media: None,
4084 reply_markup: None,
4085 entities: m.entities,
4086 views: None,
4087 forwards: None,
4088 replies: None,
4089 edit_date: None,
4090 post_author: None,
4091 grouped_id: None,
4092 reactions: None,
4093 restriction_reason: None,
4094 ttl_period: None,
4095 quick_reply_shortcut_id: None,
4096 effect: None,
4097 factcheck: None,
4098 report_delivery_until_date: None,
4099 paid_message_stars: None,
4100 suggested_post: None,
4101 from_rank: None,
4102 from_boosts_applied: None,
4103 schedule_repeat_period: None,
4104 summary_from_language: None,
4105 };
4106 return update::IncomingMessage::from_raw(tl::enums::Message::Message(msg))
4107 .with_client(self.clone());
4108 }
4109 }
4110
4111 self.synthetic_sent(input, peer, 0, 0)
4113 }
4114
4115 fn synthetic_sent_from_short(
4117 &self,
4118 sent: tl::types::UpdateShortSentMessage,
4119 input: &InputMessage,
4120 peer: &tl::enums::Peer,
4121 ) -> update::IncomingMessage {
4122 let msg = tl::types::Message {
4123 out: sent.out,
4124 mentioned: false,
4125 media_unread: false,
4126 silent: input.silent,
4127 post: false,
4128 from_scheduled: false,
4129 legacy: false,
4130 edit_hide: false,
4131 pinned: false,
4132 noforwards: false,
4133 invert_media: input.invert_media,
4134 offline: false,
4135 video_processing_pending: false,
4136 paid_suggested_post_stars: false,
4137 paid_suggested_post_ton: false,
4138 id: sent.id,
4139 from_id: None,
4140 from_boosts_applied: None,
4141 from_rank: None,
4142 peer_id: peer.clone(),
4143 saved_peer_id: None,
4144 fwd_from: None,
4145 via_bot_id: None,
4146 via_business_bot_id: None,
4147 reply_to: input.reply_to.map(|id| {
4148 tl::enums::MessageReplyHeader::MessageReplyHeader(tl::types::MessageReplyHeader {
4149 reply_to_scheduled: false,
4150 forum_topic: false,
4151 quote: false,
4152 reply_to_msg_id: Some(id),
4153 reply_to_peer_id: None,
4154 reply_from: None,
4155 reply_media: None,
4156 reply_to_top_id: None,
4157 quote_text: None,
4158 quote_entities: None,
4159 quote_offset: None,
4160 todo_item_id: None,
4161 poll_option: None,
4162 })
4163 }),
4164 date: sent.date,
4165 message: input.text.clone(),
4166 media: sent.media,
4167 reply_markup: input.reply_markup.clone(),
4168 entities: sent.entities,
4169 views: None,
4170 forwards: None,
4171 replies: None,
4172 edit_date: None,
4173 post_author: None,
4174 grouped_id: None,
4175 reactions: None,
4176 restriction_reason: None,
4177 ttl_period: sent.ttl_period,
4178 quick_reply_shortcut_id: None,
4179 effect: None,
4180 factcheck: None,
4181 report_delivery_until_date: None,
4182 paid_message_stars: None,
4183 suggested_post: None,
4184 schedule_repeat_period: None,
4185 summary_from_language: None,
4186 };
4187 update::IncomingMessage::from_raw(tl::enums::Message::Message(msg))
4188 .with_client(self.clone())
4189 }
4190
4191 fn synthetic_sent(
4193 &self,
4194 input: &InputMessage,
4195 peer: &tl::enums::Peer,
4196 id: i32,
4197 date: i32,
4198 ) -> update::IncomingMessage {
4199 let msg = tl::types::Message {
4200 out: true,
4201 mentioned: false,
4202 media_unread: false,
4203 silent: input.silent,
4204 post: false,
4205 from_scheduled: false,
4206 legacy: false,
4207 edit_hide: false,
4208 pinned: false,
4209 noforwards: false,
4210 invert_media: input.invert_media,
4211 offline: false,
4212 video_processing_pending: false,
4213 paid_suggested_post_stars: false,
4214 paid_suggested_post_ton: false,
4215 id,
4216 from_id: None,
4217 from_boosts_applied: None,
4218 from_rank: None,
4219 peer_id: peer.clone(),
4220 saved_peer_id: None,
4221 fwd_from: None,
4222 via_bot_id: None,
4223 via_business_bot_id: None,
4224 reply_to: input.reply_to.map(|rid| {
4225 tl::enums::MessageReplyHeader::MessageReplyHeader(tl::types::MessageReplyHeader {
4226 reply_to_scheduled: false,
4227 forum_topic: false,
4228 quote: false,
4229 reply_to_msg_id: Some(rid),
4230 reply_to_peer_id: None,
4231 reply_from: None,
4232 reply_media: None,
4233 reply_to_top_id: None,
4234 quote_text: None,
4235 quote_entities: None,
4236 quote_offset: None,
4237 todo_item_id: None,
4238 poll_option: None,
4239 })
4240 }),
4241 date,
4242 message: input.text.clone(),
4243 media: None,
4244 reply_markup: input.reply_markup.clone(),
4245 entities: input.entities.clone(),
4246 views: None,
4247 forwards: None,
4248 replies: None,
4249 edit_date: None,
4250 post_author: None,
4251 grouped_id: None,
4252 reactions: None,
4253 restriction_reason: None,
4254 ttl_period: None,
4255 quick_reply_shortcut_id: None,
4256 effect: None,
4257 factcheck: None,
4258 report_delivery_until_date: None,
4259 paid_message_stars: None,
4260 suggested_post: None,
4261 schedule_repeat_period: None,
4262 summary_from_language: None,
4263 };
4264 update::IncomingMessage::from_raw(tl::enums::Message::Message(msg))
4265 .with_client(self.clone())
4266 }
4267
4268 pub async fn send_to_self(
4270 &self,
4271 text: &str,
4272 ) -> Result<update::IncomingMessage, InvocationError> {
4273 let req = tl::functions::messages::SendMessage {
4274 no_webpage: false,
4275 silent: false,
4276 background: false,
4277 clear_draft: false,
4278 noforwards: false,
4279 update_stickersets_order: false,
4280 invert_media: false,
4281 allow_paid_floodskip: false,
4282 peer: tl::enums::InputPeer::PeerSelf,
4283 reply_to: None,
4284 message: text.to_string(),
4285 random_id: random_i64(),
4286 reply_markup: None,
4287 entities: None,
4288 schedule_date: None,
4289 schedule_repeat_period: None,
4290 send_as: None,
4291 quick_reply_shortcut: None,
4292 effect: None,
4293 allow_paid_stars: None,
4294 suggested_post: None,
4295 };
4296 let body = self.rpc_call_raw(&req).await?;
4297 let self_peer = tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 });
4298 Ok(self.extract_sent_message(&body, &InputMessage::text(text), &self_peer))
4299 }
4300
4301 pub async fn edit_message(
4303 &self,
4304 peer: impl Into<PeerRef>,
4305 message_id: i32,
4306 new_text: &str,
4307 ) -> Result<(), InvocationError> {
4308 let peer = peer.into().resolve(self).await?;
4309 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4310 let req = tl::functions::messages::EditMessage {
4311 no_webpage: false,
4312 invert_media: false,
4313 peer: input_peer,
4314 id: message_id,
4315 message: Some(new_text.to_string()),
4316 media: None,
4317 reply_markup: None,
4318 entities: None,
4319 schedule_date: None,
4320 schedule_repeat_period: None,
4321 quick_reply_shortcut_id: None,
4322 };
4323 self.rpc_write(&req).await
4324 }
4325
4326 pub async fn forward_messages(
4328 &self,
4329 destination: impl Into<PeerRef>,
4330 message_ids: &[i32],
4331 source: impl Into<PeerRef>,
4332 ) -> Result<(), InvocationError> {
4333 let dest = destination.into().resolve(self).await?;
4334 let src = source.into().resolve(self).await?;
4335 let cache = self.inner.peer_cache.read().await;
4336 let to_peer = cache.peer_to_input(&dest);
4337 let from_peer = cache.peer_to_input(&src);
4338 drop(cache);
4339
4340 let req = tl::functions::messages::ForwardMessages {
4341 silent: false,
4342 background: false,
4343 with_my_score: false,
4344 drop_author: false,
4345 drop_media_captions: false,
4346 noforwards: false,
4347 from_peer,
4348 id: message_ids.to_vec(),
4349 random_id: (0..message_ids.len()).map(|_| random_i64()).collect(),
4350 to_peer,
4351 top_msg_id: None,
4352 reply_to: None,
4353 schedule_date: None,
4354 schedule_repeat_period: None,
4355 send_as: None,
4356 quick_reply_shortcut: None,
4357 effect: None,
4358 video_timestamp: None,
4359 allow_paid_stars: None,
4360 allow_paid_floodskip: false,
4361 suggested_post: None,
4362 };
4363 self.rpc_write(&req).await
4364 }
4365
4366 pub async fn forward_messages_returning(
4371 &self,
4372 destination: impl Into<PeerRef>,
4373 message_ids: &[i32],
4374 source: impl Into<PeerRef>,
4375 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4376 let dest = destination.into().resolve(self).await?;
4377 let src = source.into().resolve(self).await?;
4378 let cache = self.inner.peer_cache.read().await;
4379 let to_peer = cache.peer_to_input(&dest);
4380 let from_peer = cache.peer_to_input(&src);
4381 drop(cache);
4382
4383 let req = tl::functions::messages::ForwardMessages {
4384 silent: false,
4385 background: false,
4386 with_my_score: false,
4387 drop_author: false,
4388 drop_media_captions: false,
4389 noforwards: false,
4390 from_peer,
4391 id: message_ids.to_vec(),
4392 random_id: (0..message_ids.len()).map(|_| random_i64()).collect(),
4393 to_peer,
4394 top_msg_id: None,
4395 reply_to: None,
4396 schedule_date: None,
4397 schedule_repeat_period: None,
4398 send_as: None,
4399 quick_reply_shortcut: None,
4400 effect: None,
4401 video_timestamp: None,
4402 allow_paid_stars: None,
4403 allow_paid_floodskip: false,
4404 suggested_post: None,
4405 };
4406 let body = self.rpc_call_raw(&req).await?;
4407 let mut out = Vec::new();
4409 if body.len() >= 4 {
4410 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
4411 if cid == 0x74ae4240 || cid == 0x725b04c3 {
4412 let mut cur = Cursor::from_slice(&body);
4413 let updates_opt = tl::enums::Updates::deserialize(&mut cur).ok();
4414 let raw_updates = match updates_opt {
4415 Some(tl::enums::Updates::Updates(u)) => u.updates,
4416 Some(tl::enums::Updates::Combined(u)) => u.updates,
4417 _ => vec![],
4418 };
4419 for upd in raw_updates {
4420 match upd {
4421 tl::enums::Update::NewMessage(u) => {
4422 out.push(
4423 update::IncomingMessage::from_raw(u.message)
4424 .with_client(self.clone()),
4425 );
4426 }
4427 tl::enums::Update::NewChannelMessage(u) => {
4428 out.push(
4429 update::IncomingMessage::from_raw(u.message)
4430 .with_client(self.clone()),
4431 );
4432 }
4433 _ => {}
4434 }
4435 }
4436 }
4437 }
4438 Ok(out)
4439 }
4440
4441 pub async fn delete_messages(
4443 &self,
4444 message_ids: Vec<i32>,
4445 revoke: bool,
4446 ) -> Result<(), InvocationError> {
4447 let req = tl::functions::messages::DeleteMessages {
4448 revoke,
4449 id: message_ids,
4450 };
4451 self.rpc_write(&req).await
4452 }
4453
4454 pub async fn get_messages_by_id(
4456 &self,
4457 peer: impl Into<PeerRef>,
4458 ids: &[i32],
4459 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4460 let peer = peer.into().resolve(self).await?;
4461 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4462 let id_list: Vec<tl::enums::InputMessage> = ids
4463 .iter()
4464 .map(|&id| tl::enums::InputMessage::Id(tl::types::InputMessageId { id }))
4465 .collect();
4466 let req = tl::functions::channels::GetMessages {
4467 channel: match &input_peer {
4468 tl::enums::InputPeer::Channel(c) => {
4469 tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
4470 channel_id: c.channel_id,
4471 access_hash: c.access_hash,
4472 })
4473 }
4474 _ => return self.get_messages_user(input_peer, id_list).await,
4475 },
4476 id: id_list,
4477 };
4478 let body = self.rpc_call_raw(&req).await?;
4479 let mut cur = Cursor::from_slice(&body);
4480 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4481 tl::enums::messages::Messages::Messages(m) => m.messages,
4482 tl::enums::messages::Messages::Slice(m) => m.messages,
4483 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4484 tl::enums::messages::Messages::NotModified(_) => vec![],
4485 };
4486 Ok(msgs
4487 .into_iter()
4488 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
4489 .collect())
4490 }
4491
4492 async fn get_messages_user(
4493 &self,
4494 _peer: tl::enums::InputPeer,
4495 ids: Vec<tl::enums::InputMessage>,
4496 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4497 let req = tl::functions::messages::GetMessages { id: ids };
4498 let body = self.rpc_call_raw(&req).await?;
4499 let mut cur = Cursor::from_slice(&body);
4500 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4501 tl::enums::messages::Messages::Messages(m) => m.messages,
4502 tl::enums::messages::Messages::Slice(m) => m.messages,
4503 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4504 tl::enums::messages::Messages::NotModified(_) => vec![],
4505 };
4506 Ok(msgs
4507 .into_iter()
4508 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
4509 .collect())
4510 }
4511
4512 pub async fn get_pinned_message(
4514 &self,
4515 peer: impl Into<PeerRef>,
4516 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
4517 let peer = peer.into().resolve(self).await?;
4518 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4519 let req = tl::functions::messages::Search {
4520 peer: input_peer,
4521 q: String::new(),
4522 from_id: None,
4523 saved_peer_id: None,
4524 saved_reaction: None,
4525 top_msg_id: None,
4526 filter: tl::enums::MessagesFilter::InputMessagesFilterPinned,
4527 min_date: 0,
4528 max_date: 0,
4529 offset_id: 0,
4530 add_offset: 0,
4531 limit: 1,
4532 max_id: 0,
4533 min_id: 0,
4534 hash: 0,
4535 };
4536 let body = self.rpc_call_raw(&req).await?;
4537 let mut cur = Cursor::from_slice(&body);
4538 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4539 tl::enums::messages::Messages::Messages(m) => m.messages,
4540 tl::enums::messages::Messages::Slice(m) => m.messages,
4541 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4542 tl::enums::messages::Messages::NotModified(_) => vec![],
4543 };
4544 Ok(msgs
4545 .into_iter()
4546 .next()
4547 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone())))
4548 }
4549
4550 pub async fn pin_message(
4552 &self,
4553 peer: impl Into<PeerRef>,
4554 message_id: i32,
4555 silent: bool,
4556 unpin: bool,
4557 pm_oneside: bool,
4558 ) -> Result<(), InvocationError> {
4559 let peer = peer.into().resolve(self).await?;
4560 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4561 let req = tl::functions::messages::UpdatePinnedMessage {
4562 silent,
4563 unpin,
4564 pm_oneside,
4565 peer: input_peer,
4566 id: message_id,
4567 };
4568 self.rpc_write(&req).await
4569 }
4570
4571 pub async fn unpin_message(
4573 &self,
4574 peer: impl Into<PeerRef>,
4575 message_id: i32,
4576 ) -> Result<(), InvocationError> {
4577 self.pin_message(peer, message_id, true, true, false).await
4578 }
4579
4580 pub async fn get_reply_to_message(
4595 &self,
4596 message: &update::IncomingMessage,
4597 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
4598 let reply_id = match message.reply_to_message_id() {
4599 Some(id) => id,
4600 None => return Ok(None),
4601 };
4602 let peer = match message.peer_id() {
4603 Some(p) => p.clone(),
4604 None => return Ok(None),
4605 };
4606 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4607 let id = vec![tl::enums::InputMessage::Id(tl::types::InputMessageId {
4608 id: reply_id,
4609 })];
4610
4611 let result = match &input_peer {
4612 tl::enums::InputPeer::Channel(c) => {
4613 let req = tl::functions::channels::GetMessages {
4614 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
4615 channel_id: c.channel_id,
4616 access_hash: c.access_hash,
4617 }),
4618 id,
4619 };
4620 self.rpc_call_raw(&req).await?
4621 }
4622 _ => {
4623 let req = tl::functions::messages::GetMessages { id };
4624 self.rpc_call_raw(&req).await?
4625 }
4626 };
4627
4628 let mut cur = Cursor::from_slice(&result);
4629 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4630 tl::enums::messages::Messages::Messages(m) => m.messages,
4631 tl::enums::messages::Messages::Slice(m) => m.messages,
4632 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4633 tl::enums::messages::Messages::NotModified(_) => vec![],
4634 };
4635 Ok(msgs
4636 .into_iter()
4637 .next()
4638 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone())))
4639 }
4640
4641 pub async fn unpin_all_messages(
4643 &self,
4644 peer: impl Into<PeerRef>,
4645 ) -> Result<(), InvocationError> {
4646 let peer = peer.into().resolve(self).await?;
4647 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4648 let req = tl::functions::messages::UnpinAllMessages {
4649 peer: input_peer,
4650 top_msg_id: None,
4651 saved_peer_id: None,
4652 };
4653 self.rpc_write(&req).await
4654 }
4655
4656 pub async fn search_messages(
4661 &self,
4662 peer: impl Into<PeerRef>,
4663 query: &str,
4664 limit: i32,
4665 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4666 self.search(peer, query).limit(limit).fetch(self).await
4667 }
4668
4669 pub fn search(&self, peer: impl Into<PeerRef>, query: &str) -> SearchBuilder {
4671 SearchBuilder::new(peer.into(), query.to_string())
4672 }
4673
4674 pub async fn search_global(
4676 &self,
4677 query: &str,
4678 limit: i32,
4679 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4680 self.search_global_builder(query)
4681 .limit(limit)
4682 .fetch(self)
4683 .await
4684 }
4685
4686 pub fn search_global_builder(&self, query: &str) -> GlobalSearchBuilder {
4688 GlobalSearchBuilder::new(query.to_string())
4689 }
4690
4691 pub async fn get_scheduled_messages(
4708 &self,
4709 peer: impl Into<PeerRef>,
4710 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4711 let peer = peer.into().resolve(self).await?;
4712 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4713 let req = tl::functions::messages::GetScheduledHistory {
4714 peer: input_peer,
4715 hash: 0,
4716 };
4717 let body = self.rpc_call_raw(&req).await?;
4718 let mut cur = Cursor::from_slice(&body);
4719 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4720 tl::enums::messages::Messages::Messages(m) => m.messages,
4721 tl::enums::messages::Messages::Slice(m) => m.messages,
4722 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4723 tl::enums::messages::Messages::NotModified(_) => vec![],
4724 };
4725 Ok(msgs
4726 .into_iter()
4727 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
4728 .collect())
4729 }
4730
4731 pub async fn delete_scheduled_messages(
4733 &self,
4734 peer: impl Into<PeerRef>,
4735 ids: Vec<i32>,
4736 ) -> Result<(), InvocationError> {
4737 let peer = peer.into().resolve(self).await?;
4738 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4739 let req = tl::functions::messages::DeleteScheduledMessages {
4740 peer: input_peer,
4741 id: ids,
4742 };
4743 self.rpc_write(&req).await
4744 }
4745
4746 pub async fn edit_inline_message(
4764 &self,
4765 id: tl::enums::InputBotInlineMessageId,
4766 new_text: &str,
4767 reply_markup: Option<tl::enums::ReplyMarkup>,
4768 ) -> Result<bool, InvocationError> {
4769 let req = tl::functions::messages::EditInlineBotMessage {
4770 no_webpage: false,
4771 invert_media: false,
4772 id,
4773 message: Some(new_text.to_string()),
4774 media: None,
4775 reply_markup,
4776 entities: None,
4777 };
4778 let body = self.rpc_call_raw(&req).await?;
4779 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
4781 }
4782
4783 pub async fn answer_callback_query(
4785 &self,
4786 query_id: i64,
4787 text: Option<&str>,
4788 alert: bool,
4789 ) -> Result<bool, InvocationError> {
4790 let req = tl::functions::messages::SetBotCallbackAnswer {
4791 alert,
4792 query_id,
4793 message: text.map(|s| s.to_string()),
4794 url: None,
4795 cache_time: 0,
4796 };
4797 let body = self.rpc_call_raw(&req).await?;
4798 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
4799 }
4800
4801 pub async fn answer_inline_query(
4802 &self,
4803 query_id: i64,
4804 results: Vec<tl::enums::InputBotInlineResult>,
4805 cache_time: i32,
4806 is_personal: bool,
4807 next_offset: Option<String>,
4808 ) -> Result<bool, InvocationError> {
4809 let req = tl::functions::messages::SetInlineBotResults {
4810 gallery: false,
4811 private: is_personal,
4812 query_id,
4813 results,
4814 cache_time,
4815 next_offset,
4816 switch_pm: None,
4817 switch_webview: None,
4818 };
4819 let body = self.rpc_call_raw(&req).await?;
4820 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
4821 }
4822
4823 pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
4827 let req = tl::functions::messages::GetDialogs {
4828 exclude_pinned: false,
4829 folder_id: None,
4830 offset_date: 0,
4831 offset_id: 0,
4832 offset_peer: tl::enums::InputPeer::Empty,
4833 limit,
4834 hash: 0,
4835 };
4836
4837 let body = self.rpc_call_raw(&req).await?;
4838 let mut cur = Cursor::from_slice(&body);
4839 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
4840 tl::enums::messages::Dialogs::Dialogs(d) => d,
4841 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
4842 dialogs: d.dialogs,
4843 messages: d.messages,
4844 chats: d.chats,
4845 users: d.users,
4846 },
4847 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
4848 };
4849
4850 let msg_map: HashMap<i32, tl::enums::Message> = raw
4852 .messages
4853 .into_iter()
4854 .map(|m| {
4855 let id = match &m {
4856 tl::enums::Message::Message(x) => x.id,
4857 tl::enums::Message::Service(x) => x.id,
4858 tl::enums::Message::Empty(x) => x.id,
4859 };
4860 (id, m)
4861 })
4862 .collect();
4863
4864 let user_map: HashMap<i64, tl::enums::User> = raw
4866 .users
4867 .into_iter()
4868 .filter_map(|u| {
4869 if let tl::enums::User::User(ref uu) = u {
4870 Some((uu.id, u))
4871 } else {
4872 None
4873 }
4874 })
4875 .collect();
4876
4877 let chat_map: HashMap<i64, tl::enums::Chat> = raw
4879 .chats
4880 .into_iter()
4881 .map(|c| {
4882 let id = match &c {
4883 tl::enums::Chat::Chat(x) => x.id,
4884 tl::enums::Chat::Forbidden(x) => x.id,
4885 tl::enums::Chat::Channel(x) => x.id,
4886 tl::enums::Chat::ChannelForbidden(x) => x.id,
4887 tl::enums::Chat::Empty(x) => x.id,
4888 };
4889 (id, c)
4890 })
4891 .collect();
4892
4893 {
4895 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
4896 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
4897 self.cache_users_and_chats(&u_list, &c_list).await;
4898 }
4899
4900 let result = raw
4901 .dialogs
4902 .into_iter()
4903 .map(|d| {
4904 let top_id = match &d {
4905 tl::enums::Dialog::Dialog(x) => x.top_message,
4906 _ => 0,
4907 };
4908 let peer = match &d {
4909 tl::enums::Dialog::Dialog(x) => Some(&x.peer),
4910 _ => None,
4911 };
4912
4913 let message = msg_map.get(&top_id).cloned();
4914 let entity = peer.and_then(|p| match p {
4915 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
4916 _ => None,
4917 });
4918 let chat = peer.and_then(|p| match p {
4919 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
4920 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
4921 _ => None,
4922 });
4923
4924 Dialog {
4925 raw: d,
4926 message,
4927 entity,
4928 chat,
4929 }
4930 })
4931 .collect();
4932
4933 Ok(result)
4934 }
4935
4936 #[allow(dead_code)]
4938 async fn get_dialogs_raw(
4939 &self,
4940 req: tl::functions::messages::GetDialogs,
4941 ) -> Result<Vec<Dialog>, InvocationError> {
4942 let body = self.rpc_call_raw(&req).await?;
4943 let mut cur = Cursor::from_slice(&body);
4944 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
4945 tl::enums::messages::Dialogs::Dialogs(d) => d,
4946 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
4947 dialogs: d.dialogs,
4948 messages: d.messages,
4949 chats: d.chats,
4950 users: d.users,
4951 },
4952 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
4953 };
4954
4955 let msg_map: HashMap<i32, tl::enums::Message> = raw
4956 .messages
4957 .into_iter()
4958 .map(|m| {
4959 let id = match &m {
4960 tl::enums::Message::Message(x) => x.id,
4961 tl::enums::Message::Service(x) => x.id,
4962 tl::enums::Message::Empty(x) => x.id,
4963 };
4964 (id, m)
4965 })
4966 .collect();
4967
4968 let user_map: HashMap<i64, tl::enums::User> = raw
4969 .users
4970 .into_iter()
4971 .filter_map(|u| {
4972 if let tl::enums::User::User(ref uu) = u {
4973 Some((uu.id, u))
4974 } else {
4975 None
4976 }
4977 })
4978 .collect();
4979
4980 let chat_map: HashMap<i64, tl::enums::Chat> = raw
4981 .chats
4982 .into_iter()
4983 .map(|c| {
4984 let id = match &c {
4985 tl::enums::Chat::Chat(x) => x.id,
4986 tl::enums::Chat::Forbidden(x) => x.id,
4987 tl::enums::Chat::Channel(x) => x.id,
4988 tl::enums::Chat::ChannelForbidden(x) => x.id,
4989 tl::enums::Chat::Empty(x) => x.id,
4990 };
4991 (id, c)
4992 })
4993 .collect();
4994
4995 {
4996 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
4997 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
4998 self.cache_users_and_chats(&u_list, &c_list).await;
4999 }
5000
5001 let result = raw
5002 .dialogs
5003 .into_iter()
5004 .map(|d| {
5005 let top_id = match &d {
5006 tl::enums::Dialog::Dialog(x) => x.top_message,
5007 _ => 0,
5008 };
5009 let peer = match &d {
5010 tl::enums::Dialog::Dialog(x) => Some(&x.peer),
5011 _ => None,
5012 };
5013
5014 let message = msg_map.get(&top_id).cloned();
5015 let entity = peer.and_then(|p| match p {
5016 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
5017 _ => None,
5018 });
5019 let chat = peer.and_then(|p| match p {
5020 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
5021 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
5022 _ => None,
5023 });
5024
5025 Dialog {
5026 raw: d,
5027 message,
5028 entity,
5029 chat,
5030 }
5031 })
5032 .collect();
5033
5034 Ok(result)
5035 }
5036
5037 async fn get_dialogs_raw_with_count(
5039 &self,
5040 req: tl::functions::messages::GetDialogs,
5041 ) -> Result<(Vec<Dialog>, Option<i32>), InvocationError> {
5042 let body = self.rpc_call_raw(&req).await?;
5043 let mut cur = Cursor::from_slice(&body);
5044 let (raw, count) = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
5045 tl::enums::messages::Dialogs::Dialogs(d) => (d, None),
5046 tl::enums::messages::Dialogs::Slice(d) => {
5047 let cnt = Some(d.count);
5048 (
5049 tl::types::messages::Dialogs {
5050 dialogs: d.dialogs,
5051 messages: d.messages,
5052 chats: d.chats,
5053 users: d.users,
5054 },
5055 cnt,
5056 )
5057 }
5058 tl::enums::messages::Dialogs::NotModified(_) => return Ok((vec![], None)),
5059 };
5060
5061 let msg_map: HashMap<i32, tl::enums::Message> = raw
5062 .messages
5063 .into_iter()
5064 .map(|m| {
5065 let id = match &m {
5066 tl::enums::Message::Message(x) => x.id,
5067 tl::enums::Message::Service(x) => x.id,
5068 tl::enums::Message::Empty(x) => x.id,
5069 };
5070 (id, m)
5071 })
5072 .collect();
5073
5074 let user_map: HashMap<i64, tl::enums::User> = raw
5075 .users
5076 .into_iter()
5077 .filter_map(|u| {
5078 if let tl::enums::User::User(ref uu) = u {
5079 Some((uu.id, u))
5080 } else {
5081 None
5082 }
5083 })
5084 .collect();
5085
5086 let chat_map: HashMap<i64, tl::enums::Chat> = raw
5087 .chats
5088 .into_iter()
5089 .map(|c| {
5090 let id = match &c {
5091 tl::enums::Chat::Chat(x) => x.id,
5092 tl::enums::Chat::Forbidden(x) => x.id,
5093 tl::enums::Chat::Channel(x) => x.id,
5094 tl::enums::Chat::ChannelForbidden(x) => x.id,
5095 tl::enums::Chat::Empty(x) => x.id,
5096 };
5097 (id, c)
5098 })
5099 .collect();
5100
5101 {
5102 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
5103 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
5104 self.cache_users_and_chats(&u_list, &c_list).await;
5105 }
5106
5107 let result = raw
5108 .dialogs
5109 .into_iter()
5110 .map(|d| {
5111 let top_id = match &d {
5112 tl::enums::Dialog::Dialog(x) => x.top_message,
5113 _ => 0,
5114 };
5115 let peer = match &d {
5116 tl::enums::Dialog::Dialog(x) => Some(&x.peer),
5117 _ => None,
5118 };
5119 let message = msg_map.get(&top_id).cloned();
5120 let entity = peer.and_then(|p| match p {
5121 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
5122 _ => None,
5123 });
5124 let chat = peer.and_then(|p| match p {
5125 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
5126 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
5127 _ => None,
5128 });
5129 Dialog {
5130 raw: d,
5131 message,
5132 entity,
5133 chat,
5134 }
5135 })
5136 .collect();
5137
5138 Ok((result, count))
5139 }
5140
5141 async fn get_messages_with_count(
5143 &self,
5144 peer: tl::enums::InputPeer,
5145 limit: i32,
5146 offset_id: i32,
5147 ) -> Result<(Vec<update::IncomingMessage>, Option<i32>), InvocationError> {
5148 let req = tl::functions::messages::GetHistory {
5149 peer,
5150 offset_id,
5151 offset_date: 0,
5152 add_offset: 0,
5153 limit,
5154 max_id: 0,
5155 min_id: 0,
5156 hash: 0,
5157 };
5158 let body = self.rpc_call_raw(&req).await?;
5159 let mut cur = Cursor::from_slice(&body);
5160 let (msgs, count) = match tl::enums::messages::Messages::deserialize(&mut cur)? {
5161 tl::enums::messages::Messages::Messages(m) => (m.messages, None),
5162 tl::enums::messages::Messages::Slice(m) => {
5163 let cnt = Some(m.count);
5164 (m.messages, cnt)
5165 }
5166 tl::enums::messages::Messages::ChannelMessages(m) => (m.messages, Some(m.count)),
5167 tl::enums::messages::Messages::NotModified(_) => (vec![], None),
5168 };
5169 Ok((
5170 msgs.into_iter()
5171 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
5172 .collect(),
5173 count,
5174 ))
5175 }
5176
5177 pub async fn download_media_to_file(
5188 &self,
5189 location: tl::enums::InputFileLocation,
5190 path: impl AsRef<std::path::Path>,
5191 ) -> Result<(), InvocationError> {
5192 let bytes = self.download_media(location).await?;
5193 std::fs::write(path, &bytes).map_err(InvocationError::Io)?;
5194 Ok(())
5195 }
5196
5197 pub async fn delete_dialog(&self, peer: impl Into<PeerRef>) -> Result<(), InvocationError> {
5198 let peer = peer.into().resolve(self).await?;
5199 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
5200 let req = tl::functions::messages::DeleteHistory {
5201 just_clear: false,
5202 revoke: false,
5203 peer: input_peer,
5204 max_id: 0,
5205 min_date: None,
5206 max_date: None,
5207 };
5208 self.rpc_write(&req).await
5209 }
5210
5211 pub async fn mark_as_read(&self, peer: impl Into<PeerRef>) -> Result<(), InvocationError> {
5213 let peer = peer.into().resolve(self).await?;
5214 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
5215 match &input_peer {
5216 tl::enums::InputPeer::Channel(c) => {
5217 let req = tl::functions::channels::ReadHistory {
5218 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
5219 channel_id: c.channel_id,
5220 access_hash: c.access_hash,
5221 }),
5222 max_id: 0,
5223 };
5224 self.rpc_call_raw(&req).await?;
5225 }
5226 _ => {
5227 let req = tl::functions::messages::ReadHistory {
5228 peer: input_peer,
5229 max_id: 0,
5230 };
5231 self.rpc_call_raw(&req).await?;
5232 }
5233 }
5234 Ok(())
5235 }
5236
5237 pub async fn clear_mentions(&self, peer: impl Into<PeerRef>) -> Result<(), InvocationError> {
5239 let peer = peer.into().resolve(self).await?;
5240 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
5241 let req = tl::functions::messages::ReadMentions {
5242 peer: input_peer,
5243 top_msg_id: None,
5244 };
5245 self.rpc_write(&req).await
5246 }
5247
5248 pub async fn send_chat_action(
5256 &self,
5257 peer: impl Into<PeerRef>,
5258 action: tl::enums::SendMessageAction,
5259 ) -> Result<(), InvocationError> {
5260 let peer = peer.into().resolve(self).await?;
5261 self.send_chat_action_ex(peer, action, None).await
5262 }
5263
5264 pub async fn join_chat(&self, peer: impl Into<PeerRef>) -> Result<(), InvocationError> {
5268 let peer = peer.into().resolve(self).await?;
5269 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
5270 match input_peer {
5271 tl::enums::InputPeer::Channel(c) => {
5272 let req = tl::functions::channels::JoinChannel {
5273 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
5274 channel_id: c.channel_id,
5275 access_hash: c.access_hash,
5276 }),
5277 };
5278 self.rpc_call_raw(&req).await?;
5279 }
5280 tl::enums::InputPeer::Chat(c) => {
5281 let req = tl::functions::messages::AddChatUser {
5282 chat_id: c.chat_id,
5283 user_id: tl::enums::InputUser::UserSelf,
5284 fwd_limit: 0,
5285 };
5286 self.rpc_call_raw(&req).await?;
5287 }
5288 _ => {
5289 return Err(InvocationError::Deserialize(
5290 "cannot join this peer type".into(),
5291 ));
5292 }
5293 }
5294 Ok(())
5295 }
5296
5297 pub async fn accept_invite_link(&self, link: &str) -> Result<(), InvocationError> {
5299 let hash = Self::parse_invite_hash(link)
5300 .ok_or_else(|| InvocationError::Deserialize(format!("invalid invite link: {link}")))?;
5301 let req = tl::functions::messages::ImportChatInvite {
5302 hash: hash.to_string(),
5303 };
5304 self.rpc_write(&req).await
5305 }
5306
5307 pub fn parse_invite_hash(link: &str) -> Option<&str> {
5309 if let Some(pos) = link.find("/+") {
5310 return Some(&link[pos + 2..]);
5311 }
5312 if let Some(pos) = link.find("/joinchat/") {
5313 return Some(&link[pos + 10..]);
5314 }
5315 None
5316 }
5317
5318 pub async fn get_messages(
5322 &self,
5323 peer: tl::enums::InputPeer,
5324 limit: i32,
5325 offset_id: i32,
5326 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
5327 let req = tl::functions::messages::GetHistory {
5328 peer,
5329 offset_id,
5330 offset_date: 0,
5331 add_offset: 0,
5332 limit,
5333 max_id: 0,
5334 min_id: 0,
5335 hash: 0,
5336 };
5337 let body = self.rpc_call_raw(&req).await?;
5338 let mut cur = Cursor::from_slice(&body);
5339 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
5340 tl::enums::messages::Messages::Messages(m) => m.messages,
5341 tl::enums::messages::Messages::Slice(m) => m.messages,
5342 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
5343 tl::enums::messages::Messages::NotModified(_) => vec![],
5344 };
5345 Ok(msgs
5346 .into_iter()
5347 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
5348 .collect())
5349 }
5350
5351 pub async fn resolve_peer(&self, peer: &str) -> Result<tl::enums::Peer, InvocationError> {
5355 match peer.trim() {
5356 "me" | "self" => Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 })),
5357 username if username.starts_with('@') => self.resolve_username(&username[1..]).await,
5358 id_str => {
5359 if let Ok(id) = id_str.parse::<i64>() {
5360 Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: id }))
5361 } else {
5362 Err(InvocationError::Deserialize(format!(
5363 "cannot resolve peer: {peer}"
5364 )))
5365 }
5366 }
5367 }
5368 }
5369
5370 pub async fn resolve_username(
5374 &self,
5375 username: &str,
5376 ) -> Result<tl::enums::Peer, InvocationError> {
5377 let req = tl::functions::contacts::ResolveUsername {
5378 username: username.to_string(),
5379 referer: None,
5380 };
5381 let body = self.rpc_call_raw(&req).await?;
5382 let mut cur = Cursor::from_slice(&body);
5383 let tl::enums::contacts::ResolvedPeer::ResolvedPeer(resolved) =
5384 tl::enums::contacts::ResolvedPeer::deserialize(&mut cur)?;
5385 self.cache_users_slice(&resolved.users).await;
5387 self.cache_chats_slice(&resolved.chats).await;
5388 Ok(resolved.peer)
5389 }
5390
5391 fn spawn_salt_fetch_if_needed(&self) {
5401 if self
5402 .inner
5403 .salt_request_in_flight
5404 .compare_exchange(
5405 false,
5406 true,
5407 std::sync::atomic::Ordering::SeqCst,
5408 std::sync::atomic::Ordering::SeqCst,
5409 )
5410 .is_err()
5411 {
5412 return; }
5414 let inner = Arc::clone(&self.inner);
5415 tokio::spawn(async move {
5416 tracing::debug!("[layer] proactive GetFutureSalts spawned");
5417 let mut req_body = Vec::with_capacity(8);
5418 req_body.extend_from_slice(&0xb921bd04_u32.to_le_bytes()); req_body.extend_from_slice(&64_i32.to_le_bytes()); let (wire, fk, fs_msg_id) = {
5421 let mut w = inner.writer.lock().await;
5422 let fk = w.frame_kind.clone();
5423 let (wire, id) = w.enc.pack_body_with_msg_id(&req_body, true);
5424 w.sent_bodies.insert(id, req_body);
5425 (wire, fk, id)
5426 };
5427 let (tx, rx) = tokio::sync::oneshot::channel();
5428 inner.pending.lock().await.insert(fs_msg_id, tx);
5429 let send_ok = {
5430 send_frame_write(&mut *inner.write_half.lock().await, &wire, &fk)
5431 .await
5432 .is_ok()
5433 };
5434 if !send_ok {
5435 inner.pending.lock().await.remove(&fs_msg_id);
5436 inner.writer.lock().await.sent_bodies.remove(&fs_msg_id);
5437 inner
5438 .salt_request_in_flight
5439 .store(false, std::sync::atomic::Ordering::SeqCst);
5440 return;
5441 }
5442 let _ = rx.await;
5443 inner
5444 .salt_request_in_flight
5445 .store(false, std::sync::atomic::Ordering::SeqCst);
5446 });
5447 }
5448
5449 pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
5450 let body = self.rpc_call_raw(req).await?;
5451 let mut cur = Cursor::from_slice(&body);
5452 R::Return::deserialize(&mut cur).map_err(Into::into)
5453 }
5454
5455 async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
5456 let mut rl = RetryLoop::new(Arc::clone(&self.inner.retry_policy));
5457 loop {
5458 match self.do_rpc_call(req).await {
5459 Ok(body) => return Ok(body),
5460 Err(e) if e.migrate_dc_id().is_some() => {
5461 self.migrate_to(e.migrate_dc_id().unwrap()).await?;
5464 }
5465 Err(InvocationError::Rpc(ref r)) if r.code == 401 => {
5472 return Err(InvocationError::Rpc(r.clone()));
5473 }
5474 Err(e) => rl.advance(e).await?,
5475 }
5476 }
5477 }
5478
5479 async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
5489 let (tx, rx) = oneshot::channel();
5490 let wire = {
5491 let raw_body = req.to_bytes();
5492 let body = maybe_gz_pack(&raw_body);
5494
5495 let mut w = self.inner.writer.lock().await;
5496
5497 if w.advance_salt_if_needed() {
5501 drop(w); self.spawn_salt_fetch_if_needed();
5503 w = self.inner.writer.lock().await;
5504 }
5505
5506 let fk = w.frame_kind.clone();
5507
5508 let acks: Vec<i64> = w.pending_ack.drain(..).collect();
5511
5512 if acks.is_empty() {
5513 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
5515 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
5517 (wire, fk)
5518 } else {
5519 let ack_body = build_msgs_ack_body(&acks);
5521 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false); let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true); let container_payload = build_container_body(&[
5525 (ack_msg_id, ack_seqno, ack_body.as_slice()),
5526 (req_msg_id, req_seqno, body.as_slice()),
5527 ]);
5528
5529 let (wire, container_msg_id) = w.enc.pack_container(&container_payload);
5530
5531 w.sent_bodies.insert(req_msg_id, body); w.container_map.insert(container_msg_id, req_msg_id); self.inner.pending.lock().await.insert(req_msg_id, tx);
5534 tracing::debug!(
5535 "[layer] container: bundled {} acks + request (cid={container_msg_id})",
5536 acks.len()
5537 );
5538 (wire, fk)
5539 }
5540 };
5542 send_frame_write(&mut *self.inner.write_half.lock().await, &wire.0, &wire.1).await?;
5544 match rx.await {
5545 Ok(result) => result,
5546 Err(_) => Err(InvocationError::Deserialize(
5547 "RPC channel closed (reader died?)".into(),
5548 )),
5549 }
5550 }
5551
5552 async fn rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
5555 let mut fail_count = NonZeroU32::new(1).unwrap();
5556 let mut slept_so_far = Duration::default();
5557 loop {
5558 let result = self.do_rpc_write(req).await;
5559 match result {
5560 Ok(()) => return Ok(()),
5561 Err(e) => {
5562 let ctx = RetryContext {
5563 fail_count,
5564 slept_so_far,
5565 error: e,
5566 };
5567 match self.inner.retry_policy.should_retry(&ctx) {
5568 ControlFlow::Continue(delay) => {
5569 sleep(delay).await;
5570 slept_so_far += delay;
5571 fail_count = fail_count.saturating_add(1);
5572 }
5573 ControlFlow::Break(()) => return Err(ctx.error),
5574 }
5575 }
5576 }
5577 }
5578 }
5579
5580 async fn do_rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
5581 let (tx, rx) = oneshot::channel();
5582 let wire = {
5583 let raw_body = req.to_bytes();
5584 let body = maybe_gz_pack(&raw_body);
5586
5587 let mut w = self.inner.writer.lock().await;
5588 let fk = w.frame_kind.clone();
5589
5590 let acks: Vec<i64> = w.pending_ack.drain(..).collect();
5592
5593 if acks.is_empty() {
5594 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
5595 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
5597 (wire, fk)
5598 } else {
5599 let ack_body = build_msgs_ack_body(&acks);
5600 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
5601 let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);
5602 let container_payload = build_container_body(&[
5603 (ack_msg_id, ack_seqno, ack_body.as_slice()),
5604 (req_msg_id, req_seqno, body.as_slice()),
5605 ]);
5606 let (wire, container_msg_id) = w.enc.pack_container(&container_payload);
5607 w.sent_bodies.insert(req_msg_id, body); w.container_map.insert(container_msg_id, req_msg_id); self.inner.pending.lock().await.insert(req_msg_id, tx);
5610 tracing::debug!(
5611 "[layer] write container: bundled {} acks + write (cid={container_msg_id})",
5612 acks.len()
5613 );
5614 (wire, fk)
5615 }
5616 };
5618 send_frame_write(&mut *self.inner.write_half.lock().await, &wire.0, &wire.1).await?;
5619 match rx.await {
5620 Ok(result) => result.map(|_| ()),
5621 Err(_) => Err(InvocationError::Deserialize(
5622 "rpc_write channel closed".into(),
5623 )),
5624 }
5625 }
5626
5627 async fn init_connection(&self) -> Result<(), InvocationError> {
5630 use tl::functions::{InitConnection, InvokeWithLayer, help::GetConfig};
5631 let req = InvokeWithLayer {
5632 layer: tl::LAYER,
5633 query: InitConnection {
5634 api_id: self.inner.api_id,
5635 device_model: self.inner.device_model.clone(),
5636 system_version: self.inner.system_version.clone(),
5637 app_version: self.inner.app_version.clone(),
5638 system_lang_code: self.inner.system_lang_code.clone(),
5639 lang_pack: self.inner.lang_pack.clone(),
5640 lang_code: self.inner.lang_code.clone(),
5641 proxy: None,
5642 params: None,
5643 query: GetConfig {},
5644 },
5645 };
5646
5647 let body = self.rpc_call_raw_serializable(&req).await?;
5649
5650 let mut cur = Cursor::from_slice(&body);
5651 if let Ok(tl::enums::Config::Config(cfg)) = tl::enums::Config::deserialize(&mut cur) {
5652 let allow_ipv6 = self.inner.allow_ipv6;
5653 let mut opts = self.inner.dc_options.lock().await;
5654 let mut media_opts = self.inner.media_dc_options.lock().await;
5655 for opt in &cfg.dc_options {
5656 let tl::enums::DcOption::DcOption(o) = opt;
5657 if o.ipv6 && !allow_ipv6 {
5658 continue;
5659 }
5660 let addr = format!("{}:{}", o.ip_address, o.port);
5661 let mut flags = DcFlags::NONE;
5662 if o.ipv6 {
5663 flags.set(DcFlags::IPV6);
5664 }
5665 if o.media_only {
5666 flags.set(DcFlags::MEDIA_ONLY);
5667 }
5668 if o.tcpo_only {
5669 flags.set(DcFlags::TCPO_ONLY);
5670 }
5671 if o.cdn {
5672 flags.set(DcFlags::CDN);
5673 }
5674 if o.r#static {
5675 flags.set(DcFlags::STATIC);
5676 }
5677
5678 if o.media_only || o.cdn {
5679 let e = media_opts.entry(o.id).or_insert_with(|| DcEntry {
5680 dc_id: o.id,
5681 addr: addr.clone(),
5682 auth_key: None,
5683 first_salt: 0,
5684 time_offset: 0,
5685 flags,
5686 });
5687 e.addr = addr;
5688 e.flags = flags;
5689 } else if !o.tcpo_only {
5690 let e = opts.entry(o.id).or_insert_with(|| DcEntry {
5691 dc_id: o.id,
5692 addr: addr.clone(),
5693 auth_key: None,
5694 first_salt: 0,
5695 time_offset: 0,
5696 flags,
5697 });
5698 e.addr = addr;
5699 e.flags = flags;
5700 }
5701 }
5702 tracing::info!(
5703 "[layer] initConnection ✓ ({} DCs, ipv6={})",
5704 cfg.dc_options.len(),
5705 allow_ipv6
5706 );
5707 }
5708 Ok(())
5709 }
5710
5711 async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
5714 let addr = {
5715 let opts = self.inner.dc_options.lock().await;
5716 opts.get(&new_dc_id)
5717 .map(|e| e.addr.clone())
5718 .unwrap_or_else(|| crate::dc_migration::fallback_dc_addr(new_dc_id).to_string())
5719 };
5720 tracing::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
5721
5722 let saved_key = {
5723 let opts = self.inner.dc_options.lock().await;
5724 opts.get(&new_dc_id).and_then(|e| e.auth_key)
5725 };
5726
5727 let socks5 = self.inner.socks5.clone();
5728 let mtproxy = self.inner.mtproxy.clone();
5729 let transport = self.inner.transport.clone();
5730 let conn = if let Some(key) = saved_key {
5731 Connection::connect_with_key(
5732 &addr,
5733 key,
5734 0,
5735 0,
5736 socks5.as_ref(),
5737 &transport,
5738 new_dc_id as i16,
5739 )
5740 .await?
5741 } else {
5742 Connection::connect_raw(
5743 &addr,
5744 socks5.as_ref(),
5745 mtproxy.as_ref(),
5746 &transport,
5747 new_dc_id as i16,
5748 )
5749 .await?
5750 };
5751
5752 let new_key = conn.auth_key_bytes();
5753 {
5754 let mut opts = self.inner.dc_options.lock().await;
5755 let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
5756 dc_id: new_dc_id,
5757 addr: addr.clone(),
5758 auth_key: None,
5759 first_salt: 0,
5760 time_offset: 0,
5761 flags: DcFlags::NONE,
5762 });
5763 entry.auth_key = Some(new_key);
5764 }
5765
5766 let (new_writer, new_wh, new_read, new_fk) = conn.into_writer();
5768 let new_ak = new_writer.enc.auth_key_bytes();
5769 let new_sid = new_writer.enc.session_id();
5770 *self.inner.writer.lock().await = new_writer;
5771 *self.inner.write_half.lock().await = new_wh;
5772 *self.inner.home_dc_id.lock().await = new_dc_id;
5773
5774 let _ = self
5777 .inner
5778 .reconnect_tx
5779 .send((new_read, new_fk, new_ak, new_sid));
5780
5781 loop {
5791 match self.init_connection().await {
5792 Ok(()) => break,
5793 Err(InvocationError::Rpc(ref r)) if r.flood_wait_seconds().is_some() => {
5794 let secs = r.flood_wait_seconds().unwrap();
5795 tracing::warn!(
5796 "[layer] migrate_to DC{new_dc_id}: init FLOOD_WAIT_{secs}: waiting"
5797 );
5798 sleep(Duration::from_secs(secs + 1)).await;
5799 }
5800 Err(e) => return Err(e),
5801 }
5802 }
5803
5804 self.save_session().await.ok();
5805 tracing::info!("[layer] Now on DC{new_dc_id} ✓");
5806 Ok(())
5807 }
5808
5809 pub fn disconnect(&self) {
5819 self.inner.shutdown_token.cancel();
5820 }
5821
5822 pub async fn sync_update_state(&self) {
5830 let _ = self.sync_pts_state().await;
5831 }
5832
5833 async fn cache_user(&self, user: &tl::enums::User) {
5836 self.inner.peer_cache.write().await.cache_user(user);
5837 }
5838
5839 async fn cache_users_slice(&self, users: &[tl::enums::User]) {
5840 let mut cache = self.inner.peer_cache.write().await;
5841 cache.cache_users(users);
5842 }
5843
5844 async fn cache_chats_slice(&self, chats: &[tl::enums::Chat]) {
5845 let mut cache = self.inner.peer_cache.write().await;
5846 cache.cache_chats(chats);
5847 }
5848
5849 async fn cache_users_and_chats(&self, users: &[tl::enums::User], chats: &[tl::enums::Chat]) {
5851 let mut cache = self.inner.peer_cache.write().await;
5852 cache.cache_users(users);
5853 cache.cache_chats(chats);
5854 }
5855
5856 #[doc(hidden)]
5858 pub async fn cache_users_slice_pub(&self, users: &[tl::enums::User]) {
5859 self.cache_users_slice(users).await;
5860 }
5861
5862 #[doc(hidden)]
5863 pub async fn cache_chats_slice_pub(&self, chats: &[tl::enums::Chat]) {
5864 self.cache_chats_slice(chats).await;
5865 }
5866
5867 #[doc(hidden)]
5869 pub async fn rpc_call_raw_pub<R: layer_tl_types::RemoteCall>(
5870 &self,
5871 req: &R,
5872 ) -> Result<Vec<u8>, InvocationError> {
5873 self.rpc_call_raw(req).await
5874 }
5875
5876 async fn rpc_call_raw_serializable<S: tl::Serializable>(
5878 &self,
5879 req: &S,
5880 ) -> Result<Vec<u8>, InvocationError> {
5881 let mut fail_count = NonZeroU32::new(1).unwrap();
5882 let mut slept_so_far = Duration::default();
5883 loop {
5884 match self.do_rpc_write_returning_body(req).await {
5885 Ok(body) => return Ok(body),
5886 Err(e) => {
5887 let ctx = RetryContext {
5888 fail_count,
5889 slept_so_far,
5890 error: e,
5891 };
5892 match self.inner.retry_policy.should_retry(&ctx) {
5893 ControlFlow::Continue(delay) => {
5894 sleep(delay).await;
5895 slept_so_far += delay;
5896 fail_count = fail_count.saturating_add(1);
5897 }
5898 ControlFlow::Break(()) => return Err(ctx.error),
5899 }
5900 }
5901 }
5902 }
5903 }
5904
5905 async fn do_rpc_write_returning_body<S: tl::Serializable>(
5906 &self,
5907 req: &S,
5908 ) -> Result<Vec<u8>, InvocationError> {
5909 let (tx, rx) = oneshot::channel();
5910 let wire = {
5911 let raw_body = req.to_bytes();
5912 let body = maybe_gz_pack(&raw_body); let mut w = self.inner.writer.lock().await;
5914 let fk = w.frame_kind.clone();
5915 let acks: Vec<i64> = w.pending_ack.drain(..).collect(); if acks.is_empty() {
5917 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
5918 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
5920 (wire, fk)
5921 } else {
5922 let ack_body = build_msgs_ack_body(&acks);
5923 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
5924 let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);
5925 let container_payload = build_container_body(&[
5926 (ack_msg_id, ack_seqno, ack_body.as_slice()),
5927 (req_msg_id, req_seqno, body.as_slice()),
5928 ]);
5929 let (wire, container_msg_id) = w.enc.pack_container(&container_payload);
5930 w.sent_bodies.insert(req_msg_id, body); w.container_map.insert(container_msg_id, req_msg_id); self.inner.pending.lock().await.insert(req_msg_id, tx);
5933 (wire, fk)
5934 }
5935 };
5937 send_frame_write(&mut *self.inner.write_half.lock().await, &wire.0, &wire.1).await?;
5938 match rx.await {
5939 Ok(result) => result,
5940 Err(_) => Err(InvocationError::Deserialize("rpc channel closed".into())),
5941 }
5942 }
5943
5944 pub async fn count_channels(&self) -> Result<usize, InvocationError> {
5947 let mut iter = self.iter_dialogs();
5948 let mut count = 0usize;
5949 while let Some(dialog) = iter.next(self).await? {
5950 if matches!(dialog.peer(), Some(tl::enums::Peer::Channel(_))) {
5951 count += 1;
5952 }
5953 }
5954 Ok(count)
5955 }
5956
5957 pub fn iter_dialogs(&self) -> DialogIter {
5971 DialogIter {
5972 offset_date: 0,
5973 offset_id: 0,
5974 offset_peer: tl::enums::InputPeer::Empty,
5975 done: false,
5976 buffer: VecDeque::new(),
5977 total: None,
5978 }
5979 }
5980
5981 pub fn iter_messages(&self, peer: impl Into<PeerRef>) -> MessageIter {
5995 MessageIter {
5996 unresolved: Some(peer.into()),
5997 peer: None,
5998 offset_id: 0,
5999 done: false,
6000 buffer: VecDeque::new(),
6001 total: None,
6002 }
6003 }
6004
6005 pub async fn resolve_to_input_peer(
6010 &self,
6011 peer: &tl::enums::Peer,
6012 ) -> Result<tl::enums::InputPeer, InvocationError> {
6013 let cache = self.inner.peer_cache.read().await;
6014 match peer {
6015 tl::enums::Peer::User(u) => {
6016 if u.user_id == 0 {
6017 return Ok(tl::enums::InputPeer::PeerSelf);
6018 }
6019 match cache.users.get(&u.user_id) {
6020 Some(&hash) => Ok(tl::enums::InputPeer::User(tl::types::InputPeerUser {
6021 user_id: u.user_id,
6022 access_hash: hash,
6023 })),
6024 None => Err(InvocationError::Deserialize(format!(
6025 "access_hash unknown for user {}; resolve via username first",
6026 u.user_id
6027 ))),
6028 }
6029 }
6030 tl::enums::Peer::Chat(c) => Ok(tl::enums::InputPeer::Chat(tl::types::InputPeerChat {
6031 chat_id: c.chat_id,
6032 })),
6033 tl::enums::Peer::Channel(c) => match cache.channels.get(&c.channel_id) {
6034 Some(&hash) => Ok(tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
6035 channel_id: c.channel_id,
6036 access_hash: hash,
6037 })),
6038 None => Err(InvocationError::Deserialize(format!(
6039 "access_hash unknown for channel {}; resolve via username first",
6040 c.channel_id
6041 ))),
6042 },
6043 }
6044 }
6045
6046 pub async fn invoke_on_dc<R: RemoteCall>(
6054 &self,
6055 dc_id: i32,
6056 req: &R,
6057 ) -> Result<R::Return, InvocationError> {
6058 let body = self.rpc_on_dc_raw(dc_id, req).await?;
6059 let mut cur = Cursor::from_slice(&body);
6060 R::Return::deserialize(&mut cur).map_err(Into::into)
6061 }
6062
6063 async fn rpc_on_dc_raw<R: RemoteCall>(
6065 &self,
6066 dc_id: i32,
6067 req: &R,
6068 ) -> Result<Vec<u8>, InvocationError> {
6069 let needs_new = {
6071 let pool = self.inner.dc_pool.lock().await;
6072 !pool.has_connection(dc_id)
6073 };
6074
6075 if needs_new {
6076 let addr = {
6077 let opts = self.inner.dc_options.lock().await;
6078 opts.get(&dc_id)
6079 .map(|e| e.addr.clone())
6080 .unwrap_or_else(|| crate::dc_migration::fallback_dc_addr(dc_id).to_string())
6081 };
6082
6083 let socks5 = self.inner.socks5.clone();
6084 let transport = self.inner.transport.clone();
6085 let saved_key = {
6086 let opts = self.inner.dc_options.lock().await;
6087 opts.get(&dc_id).and_then(|e| e.auth_key)
6088 };
6089
6090 let dc_conn = if let Some(key) = saved_key {
6091 dc_pool::DcConnection::connect_with_key(
6092 &addr,
6093 key,
6094 0,
6095 0,
6096 socks5.as_ref(),
6097 &transport,
6098 dc_id as i16,
6099 )
6100 .await?
6101 } else {
6102 let conn = dc_pool::DcConnection::connect_raw(
6103 &addr,
6104 socks5.as_ref(),
6105 &transport,
6106 dc_id as i16,
6107 )
6108 .await?;
6109 let home_dc_id = *self.inner.home_dc_id.lock().await;
6111 if dc_id != home_dc_id
6112 && let Err(e) = self.export_import_auth(dc_id, &conn).await
6113 {
6114 tracing::warn!("[layer] Auth export/import for DC{dc_id} failed: {e}");
6115 }
6116 conn
6117 };
6118
6119 let key = dc_conn.auth_key_bytes();
6120 {
6121 let mut opts = self.inner.dc_options.lock().await;
6122 if let Some(e) = opts.get_mut(&dc_id) {
6123 e.auth_key = Some(key);
6124 }
6125 }
6126 self.inner.dc_pool.lock().await.insert(dc_id, dc_conn);
6127 }
6128
6129 let dc_entries: Vec<DcEntry> = self
6130 .inner
6131 .dc_options
6132 .lock()
6133 .await
6134 .values()
6135 .cloned()
6136 .collect();
6137 self.inner
6138 .dc_pool
6139 .lock()
6140 .await
6141 .invoke_on_dc(dc_id, &dc_entries, req)
6142 .await
6143 }
6144
6145 async fn export_import_auth(
6147 &self,
6148 dc_id: i32,
6149 _dc_conn: &dc_pool::DcConnection, ) -> Result<(), InvocationError> {
6151 let export_req = tl::functions::auth::ExportAuthorization { dc_id };
6153 let body = self.rpc_call_raw(&export_req).await?;
6154 let mut cur = Cursor::from_slice(&body);
6155 let tl::enums::auth::ExportedAuthorization::ExportedAuthorization(exported) =
6156 tl::enums::auth::ExportedAuthorization::deserialize(&mut cur)?;
6157
6158 let import_req = tl::functions::auth::ImportAuthorization {
6160 id: exported.id,
6161 bytes: exported.bytes,
6162 };
6163 let dc_entries: Vec<DcEntry> = self
6164 .inner
6165 .dc_options
6166 .lock()
6167 .await
6168 .values()
6169 .cloned()
6170 .collect();
6171 self.inner
6172 .dc_pool
6173 .lock()
6174 .await
6175 .invoke_on_dc(dc_id, &dc_entries, &import_req)
6176 .await?;
6177 tracing::debug!("[layer] Auth exported+imported to DC{dc_id} ✓");
6178 Ok(())
6179 }
6180
6181 async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
6184 let body = self
6185 .rpc_call_raw(&tl::functions::account::GetPassword {})
6186 .await?;
6187 let mut cur = Cursor::from_slice(&body);
6188 let tl::enums::account::Password::Password(pw) =
6189 tl::enums::account::Password::deserialize(&mut cur)?;
6190 Ok(PasswordToken { password: pw })
6191 }
6192
6193 fn make_send_code_req(&self, phone: &str) -> tl::functions::auth::SendCode {
6194 tl::functions::auth::SendCode {
6195 phone_number: phone.to_string(),
6196 api_id: self.inner.api_id,
6197 api_hash: self.inner.api_hash.clone(),
6198 settings: tl::enums::CodeSettings::CodeSettings(tl::types::CodeSettings {
6199 allow_flashcall: false,
6200 current_number: false,
6201 allow_app_hash: false,
6202 allow_missed_call: false,
6203 allow_firebase: false,
6204 unknown_number: false,
6205 logout_tokens: None,
6206 token: None,
6207 app_sandbox: None,
6208 }),
6209 }
6210 }
6211
6212 fn extract_user_name(user: &tl::enums::User) -> String {
6213 match user {
6214 tl::enums::User::User(u) => format!(
6215 "{} {}",
6216 u.first_name.as_deref().unwrap_or(""),
6217 u.last_name.as_deref().unwrap_or("")
6218 )
6219 .trim()
6220 .to_string(),
6221 tl::enums::User::Empty(_) => "(unknown)".into(),
6222 }
6223 }
6224
6225 #[allow(clippy::type_complexity)]
6226 fn extract_password_params(
6227 algo: &tl::enums::PasswordKdfAlgo,
6228 ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
6229 match algo {
6230 tl::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
6231 Ok((&a.salt1, &a.salt2, &a.p, a.g))
6232 }
6233 _ => Err(InvocationError::Deserialize(
6234 "unsupported password KDF algo".into(),
6235 )),
6236 }
6237 }
6238}
6239
6240pub(crate) fn attach_client_to_update(u: update::Update, client: &Client) -> update::Update {
6243 match u {
6244 update::Update::NewMessage(msg) => {
6245 update::Update::NewMessage(msg.with_client(client.clone()))
6246 }
6247 update::Update::MessageEdited(msg) => {
6248 update::Update::MessageEdited(msg.with_client(client.clone()))
6249 }
6250 other => other,
6251 }
6252}
6253
6254pub struct DialogIter {
6258 offset_date: i32,
6259 offset_id: i32,
6260 offset_peer: tl::enums::InputPeer,
6261 done: bool,
6262 buffer: VecDeque<Dialog>,
6263 pub total: Option<i32>,
6266}
6267
6268impl DialogIter {
6269 const PAGE_SIZE: i32 = 100;
6270
6271 pub fn total(&self) -> Option<i32> {
6277 self.total
6278 }
6279
6280 pub async fn next(&mut self, client: &Client) -> Result<Option<Dialog>, InvocationError> {
6282 if let Some(d) = self.buffer.pop_front() {
6283 return Ok(Some(d));
6284 }
6285 if self.done {
6286 return Ok(None);
6287 }
6288
6289 let req = tl::functions::messages::GetDialogs {
6290 exclude_pinned: false,
6291 folder_id: None,
6292 offset_date: self.offset_date,
6293 offset_id: self.offset_id,
6294 offset_peer: self.offset_peer.clone(),
6295 limit: Self::PAGE_SIZE,
6296 hash: 0,
6297 };
6298
6299 let (dialogs, count) = client.get_dialogs_raw_with_count(req).await?;
6300 if self.total.is_none() {
6302 self.total = count;
6303 }
6304 if dialogs.is_empty() || dialogs.len() < Self::PAGE_SIZE as usize {
6305 self.done = true;
6306 }
6307
6308 if let Some(last) = dialogs.last() {
6310 self.offset_date = last
6311 .message
6312 .as_ref()
6313 .map(|m| match m {
6314 tl::enums::Message::Message(x) => x.date,
6315 tl::enums::Message::Service(x) => x.date,
6316 _ => 0,
6317 })
6318 .unwrap_or(0);
6319 self.offset_id = last.top_message();
6320 if let Some(peer) = last.peer() {
6321 self.offset_peer = client.inner.peer_cache.read().await.peer_to_input(peer);
6322 }
6323 }
6324
6325 self.buffer.extend(dialogs);
6326 Ok(self.buffer.pop_front())
6327 }
6328}
6329
6330pub struct MessageIter {
6332 unresolved: Option<PeerRef>,
6333 peer: Option<tl::enums::Peer>,
6334 offset_id: i32,
6335 done: bool,
6336 buffer: VecDeque<update::IncomingMessage>,
6337 pub total: Option<i32>,
6341}
6342
6343impl MessageIter {
6344 const PAGE_SIZE: i32 = 100;
6345
6346 pub fn total(&self) -> Option<i32> {
6351 self.total
6352 }
6353
6354 pub async fn next(
6356 &mut self,
6357 client: &Client,
6358 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
6359 if let Some(m) = self.buffer.pop_front() {
6360 return Ok(Some(m));
6361 }
6362 if self.done {
6363 return Ok(None);
6364 }
6365
6366 let peer = if let Some(p) = &self.peer {
6368 p.clone()
6369 } else {
6370 let pr = self.unresolved.take().expect("MessageIter: peer not set");
6371 let p = pr.resolve(client).await?;
6372 self.peer = Some(p.clone());
6373 p
6374 };
6375
6376 let input_peer = client.inner.peer_cache.read().await.peer_to_input(&peer);
6377 let (page, count) = client
6378 .get_messages_with_count(input_peer, Self::PAGE_SIZE, self.offset_id)
6379 .await?;
6380
6381 if self.total.is_none() {
6382 self.total = count;
6383 }
6384
6385 if page.is_empty() || page.len() < Self::PAGE_SIZE as usize {
6386 self.done = true;
6387 }
6388 if let Some(last) = page.last() {
6389 self.offset_id = last.id();
6390 }
6391
6392 self.buffer.extend(page);
6393 Ok(self.buffer.pop_front())
6394 }
6395}
6396
6397#[doc(hidden)]
6401pub fn random_i64_pub() -> i64 {
6402 random_i64()
6403}
6404
6405pub fn is_bool_true(body: &[u8]) -> bool {
6406 body.len() == 4 && u32::from_le_bytes(body[0..4].try_into().unwrap_or([0u8; 4])) == 0x997275b5
6407}
6408
6409pub fn is_bool_false(body: &[u8]) -> bool {
6410 body.len() == 4 && u32::from_le_bytes(body[0..4].try_into().unwrap_or([0u8; 4])) == 0xbc799737
6411}
6412
6413#[derive(Clone)]
6423enum FrameKind {
6424 Abridged,
6425 Intermediate,
6426 #[allow(dead_code)]
6427 Full {
6428 send_seqno: Arc<std::sync::atomic::AtomicU32>,
6429 recv_seqno: Arc<std::sync::atomic::AtomicU32>,
6430 },
6431 Obfuscated {
6433 cipher: std::sync::Arc<tokio::sync::Mutex<layer_crypto::ObfuscatedCipher>>,
6434 },
6435 PaddedIntermediate {
6437 cipher: std::sync::Arc<tokio::sync::Mutex<layer_crypto::ObfuscatedCipher>>,
6438 },
6439 FakeTls {
6441 cipher: std::sync::Arc<tokio::sync::Mutex<layer_crypto::ObfuscatedCipher>>,
6442 },
6443}
6444
6445#[derive(Clone, Debug)]
6451struct FutureSalt {
6452 valid_since: i32,
6453 valid_until: i32,
6454 salt: i64,
6455}
6456
6457const SALT_USE_DELAY: i32 = 60;
6460
6461struct ConnectionWriter {
6463 enc: EncryptedSession,
6464 frame_kind: FrameKind,
6465 pending_ack: Vec<i64>,
6469 sent_bodies: std::collections::HashMap<i64, Vec<u8>>,
6473 container_map: std::collections::HashMap<i64, i64>,
6479 salts: Vec<FutureSalt>,
6484 start_salt_time: Option<(i32, std::time::Instant)>,
6489}
6490
6491impl ConnectionWriter {
6492 fn auth_key_bytes(&self) -> [u8; 256] {
6493 self.enc.auth_key_bytes()
6494 }
6495 fn first_salt(&self) -> i64 {
6496 self.enc.salt
6497 }
6498 fn time_offset(&self) -> i32 {
6499 self.enc.time_offset
6500 }
6501
6502 fn advance_salt_if_needed(&mut self) -> bool {
6516 let Some((server_now, start_instant)) = self.start_salt_time else {
6517 return self.salts.len() <= 1;
6518 };
6519
6520 let now = server_now + start_instant.elapsed().as_secs() as i32;
6522
6523 while self.salts.len() > 1 && now > self.salts[0].valid_until {
6525 let expired = self.salts.remove(0);
6526 tracing::debug!(
6527 "[layer] salt {:#x} expired (valid_until={}), pruned",
6528 expired.salt,
6529 expired.valid_until,
6530 );
6531 }
6532
6533 if self.salts.len() > 1 {
6536 let best = self
6537 .salts
6538 .iter()
6539 .rev()
6540 .find(|s| s.valid_since + SALT_USE_DELAY <= now)
6541 .map(|s| s.salt);
6542 if let Some(salt) = best
6543 && salt != self.enc.salt
6544 {
6545 tracing::debug!(
6546 "[layer] proactive salt cycle: {:#x} → {:#x}",
6547 self.enc.salt,
6548 salt
6549 );
6550 self.enc.salt = salt;
6551 self.salts.retain(|s| s.valid_since >= now - SALT_USE_DELAY);
6553 if self.salts.is_empty() {
6554 self.salts.push(FutureSalt {
6556 valid_since: 0,
6557 valid_until: i32::MAX,
6558 salt,
6559 });
6560 }
6561 }
6562 }
6563
6564 self.salts.len() <= 1
6565 }
6566}
6567
6568struct Connection {
6569 stream: TcpStream,
6570 enc: EncryptedSession,
6571 frame_kind: FrameKind,
6572}
6573
6574impl Connection {
6575 async fn open_stream(
6577 addr: &str,
6578 socks5: Option<&crate::socks5::Socks5Config>,
6579 transport: &TransportKind,
6580 dc_id: i16,
6581 ) -> Result<(TcpStream, FrameKind), InvocationError> {
6582 let stream = match socks5 {
6583 Some(proxy) => proxy.connect(addr).await?,
6584 None => {
6585 let stream = TcpStream::connect(addr)
6586 .await
6587 .map_err(InvocationError::Io)?;
6588 stream.set_nodelay(true).ok();
6589 {
6590 let sock = socket2::SockRef::from(&stream);
6591 let keepalive = TcpKeepalive::new()
6592 .with_time(Duration::from_secs(TCP_KEEPALIVE_IDLE_SECS))
6593 .with_interval(Duration::from_secs(TCP_KEEPALIVE_INTERVAL_SECS));
6594 #[cfg(not(target_os = "windows"))]
6595 let keepalive = keepalive.with_retries(TCP_KEEPALIVE_PROBES);
6596 sock.set_tcp_keepalive(&keepalive).ok();
6597 }
6598 stream
6599 }
6600 };
6601 Self::apply_transport_init(stream, transport, dc_id).await
6602 }
6603
6604 async fn open_stream_mtproxy(
6607 mtproxy: &crate::proxy::MtProxyConfig,
6608 dc_id: i16,
6609 ) -> Result<(TcpStream, FrameKind), InvocationError> {
6610 let stream = mtproxy.connect().await?;
6611 stream.set_nodelay(true).ok();
6612 Self::apply_transport_init(stream, &mtproxy.transport, dc_id).await
6613 }
6614
6615 async fn apply_transport_init(
6616 mut stream: TcpStream,
6617 transport: &TransportKind,
6618 dc_id: i16,
6619 ) -> Result<(TcpStream, FrameKind), InvocationError> {
6620 match transport {
6621 TransportKind::Abridged => {
6622 stream.write_all(&[0xef]).await?;
6623 Ok((stream, FrameKind::Abridged))
6624 }
6625 TransportKind::Intermediate => {
6626 stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
6627 Ok((stream, FrameKind::Intermediate))
6628 }
6629 TransportKind::Full => {
6630 Ok((
6632 stream,
6633 FrameKind::Full {
6634 send_seqno: Arc::new(std::sync::atomic::AtomicU32::new(0)),
6635 recv_seqno: Arc::new(std::sync::atomic::AtomicU32::new(0)),
6636 },
6637 ))
6638 }
6639 TransportKind::Obfuscated { secret } => {
6640 use sha2::Digest;
6641
6642 let mut nonce = [0u8; 64];
6647 loop {
6648 getrandom::getrandom(&mut nonce)
6649 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
6650 let first = u32::from_le_bytes(nonce[0..4].try_into().unwrap());
6651 let second = u32::from_le_bytes(nonce[4..8].try_into().unwrap());
6652 let bad = nonce[0] == 0xEF
6653 || first == 0x44414548 || first == 0x54534F50 || first == 0x20544547 || first == 0xEEEEEEEE
6657 || first == 0xDDDDDDDD
6658 || first == 0x02010316
6659 || second == 0x00000000;
6660 if !bad {
6661 break;
6662 }
6663 }
6664
6665 let tx_raw: [u8; 32] = nonce[8..40].try_into().unwrap();
6671 let tx_iv: [u8; 16] = nonce[40..56].try_into().unwrap();
6672 let mut rev48 = nonce[8..56].to_vec();
6673 rev48.reverse();
6674 let rx_raw: [u8; 32] = rev48[0..32].try_into().unwrap();
6675 let rx_iv: [u8; 16] = rev48[32..48].try_into().unwrap();
6676
6677 let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
6678 let mut h = sha2::Sha256::new();
6679 h.update(tx_raw);
6680 h.update(s.as_ref());
6681 let tx: [u8; 32] = h.finalize().into();
6682
6683 let mut h = sha2::Sha256::new();
6684 h.update(rx_raw);
6685 h.update(s.as_ref());
6686 let rx: [u8; 32] = h.finalize().into();
6687 (tx, rx)
6688 } else {
6689 (tx_raw, rx_raw)
6690 };
6691
6692 nonce[56] = 0xef;
6695 nonce[57] = 0xef;
6696 nonce[58] = 0xef;
6697 nonce[59] = 0xef;
6698 let dc_bytes = dc_id.to_le_bytes();
6699 nonce[60] = dc_bytes[0];
6700 nonce[61] = dc_bytes[1];
6701
6702 let mut cipher =
6712 layer_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
6713 let mut skip = [0u8; 56];
6715 cipher.encrypt(&mut skip);
6716 cipher.encrypt(&mut nonce[56..64]);
6718
6719 stream.write_all(&nonce).await?;
6720
6721 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
6722 Ok((stream, FrameKind::Obfuscated { cipher: cipher_arc }))
6723 }
6724 TransportKind::PaddedIntermediate { secret } => {
6725 use sha2::Digest;
6726 let mut nonce = [0u8; 64];
6727 loop {
6728 getrandom::getrandom(&mut nonce)
6729 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
6730 let first = u32::from_le_bytes(nonce[0..4].try_into().unwrap());
6731 let second = u32::from_le_bytes(nonce[4..8].try_into().unwrap());
6732 let bad = nonce[0] == 0xEF
6733 || first == 0x44414548
6734 || first == 0x54534F50
6735 || first == 0x20544547
6736 || first == 0xEEEEEEEE
6737 || first == 0xDDDDDDDD
6738 || first == 0x02010316
6739 || second == 0x00000000;
6740 if !bad {
6741 break;
6742 }
6743 }
6744 let tx_raw: [u8; 32] = nonce[8..40].try_into().unwrap();
6745 let tx_iv: [u8; 16] = nonce[40..56].try_into().unwrap();
6746 let mut rev48 = nonce[8..56].to_vec();
6747 rev48.reverse();
6748 let rx_raw: [u8; 32] = rev48[0..32].try_into().unwrap();
6749 let rx_iv: [u8; 16] = rev48[32..48].try_into().unwrap();
6750 let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
6751 let mut h = sha2::Sha256::new();
6752 h.update(tx_raw);
6753 h.update(s.as_ref());
6754 let tx: [u8; 32] = h.finalize().into();
6755 let mut h = sha2::Sha256::new();
6756 h.update(rx_raw);
6757 h.update(s.as_ref());
6758 let rx: [u8; 32] = h.finalize().into();
6759 (tx, rx)
6760 } else {
6761 (tx_raw, rx_raw)
6762 };
6763 nonce[56] = 0xdd;
6765 nonce[57] = 0xdd;
6766 nonce[58] = 0xdd;
6767 nonce[59] = 0xdd;
6768 let dc_bytes = dc_id.to_le_bytes();
6769 nonce[60] = dc_bytes[0];
6770 nonce[61] = dc_bytes[1];
6771 let mut cipher =
6772 layer_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
6773 let mut skip = [0u8; 56];
6774 cipher.encrypt(&mut skip);
6775 cipher.encrypt(&mut nonce[56..64]);
6776 stream.write_all(&nonce).await?;
6777 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
6778 Ok((stream, FrameKind::PaddedIntermediate { cipher: cipher_arc }))
6779 }
6780 TransportKind::FakeTls { secret, domain } => {
6781 let domain_bytes = domain.as_bytes();
6785 let mut session_id = [0u8; 32];
6786 getrandom::getrandom(&mut session_id)
6787 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
6788
6789 let cipher_suites: &[u8] = &[0x00, 0x04, 0x13, 0x01, 0x13, 0x02];
6791 let compression: &[u8] = &[0x01, 0x00];
6792 let sni_name_len = domain_bytes.len() as u16;
6793 let sni_list_len = sni_name_len + 3;
6794 let sni_ext_len = sni_list_len + 2;
6795 let mut sni_ext = Vec::new();
6796 sni_ext.extend_from_slice(&[0x00, 0x00]);
6797 sni_ext.extend_from_slice(&sni_ext_len.to_be_bytes());
6798 sni_ext.extend_from_slice(&sni_list_len.to_be_bytes());
6799 sni_ext.push(0x00);
6800 sni_ext.extend_from_slice(&sni_name_len.to_be_bytes());
6801 sni_ext.extend_from_slice(domain_bytes);
6802 let sup_ver: &[u8] = &[0x00, 0x2b, 0x00, 0x03, 0x02, 0x03, 0x04];
6803 let sup_grp: &[u8] = &[0x00, 0x0a, 0x00, 0x04, 0x00, 0x02, 0x00, 0x1d];
6804 let sess_tick: &[u8] = &[0x00, 0x23, 0x00, 0x00];
6805 let ext_body_len = sni_ext.len() + sup_ver.len() + sup_grp.len() + sess_tick.len();
6806 let mut extensions = Vec::new();
6807 extensions.extend_from_slice(&(ext_body_len as u16).to_be_bytes());
6808 extensions.extend_from_slice(&sni_ext);
6809 extensions.extend_from_slice(sup_ver);
6810 extensions.extend_from_slice(sup_grp);
6811 extensions.extend_from_slice(sess_tick);
6812
6813 let mut hello_body = Vec::new();
6814 hello_body.extend_from_slice(&[0x03, 0x03]);
6815 hello_body.extend_from_slice(&[0u8; 32]); hello_body.push(session_id.len() as u8);
6817 hello_body.extend_from_slice(&session_id);
6818 hello_body.extend_from_slice(cipher_suites);
6819 hello_body.extend_from_slice(compression);
6820 hello_body.extend_from_slice(&extensions);
6821
6822 let hs_len = hello_body.len() as u32;
6823 let mut handshake = vec![
6824 0x01,
6825 ((hs_len >> 16) & 0xff) as u8,
6826 ((hs_len >> 8) & 0xff) as u8,
6827 (hs_len & 0xff) as u8,
6828 ];
6829 handshake.extend_from_slice(&hello_body);
6830
6831 let rec_len = handshake.len() as u16;
6832 let mut record = Vec::new();
6833 record.push(0x16);
6834 record.extend_from_slice(&[0x03, 0x01]);
6835 record.extend_from_slice(&rec_len.to_be_bytes());
6836 record.extend_from_slice(&handshake);
6837
6838 use sha2::Digest;
6840 let random_offset = 5 + 4 + 2; let hmac_result: [u8; 32] = {
6842 use hmac::{Hmac, Mac};
6843 type HmacSha256 = Hmac<sha2::Sha256>;
6844 let mut mac = HmacSha256::new_from_slice(secret)
6845 .map_err(|_| InvocationError::Deserialize("HMAC key error".into()))?;
6846 mac.update(&record);
6847 mac.finalize().into_bytes().into()
6848 };
6849 record[random_offset..random_offset + 32].copy_from_slice(&hmac_result);
6850 stream.write_all(&record).await?;
6851
6852 let mut h = sha2::Sha256::new();
6854 h.update(secret.as_ref());
6855 h.update(hmac_result);
6856 let derived: [u8; 32] = h.finalize().into();
6857 let iv = [0u8; 16];
6858 let cipher =
6859 layer_crypto::ObfuscatedCipher::from_keys(&derived, &iv, &derived, &iv);
6860 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
6861 Ok((stream, FrameKind::FakeTls { cipher: cipher_arc }))
6862 }
6863 }
6864 }
6865
6866 async fn connect_raw(
6867 addr: &str,
6868 socks5: Option<&crate::socks5::Socks5Config>,
6869 mtproxy: Option<&crate::proxy::MtProxyConfig>,
6870 transport: &TransportKind,
6871 dc_id: i16,
6872 ) -> Result<Self, InvocationError> {
6873 tracing::debug!("[layer] Connecting to {addr} (DH) …");
6874
6875 let addr2 = addr.to_string();
6876 let socks5_c = socks5.cloned();
6877 let mtproxy_c = mtproxy.cloned();
6878 let transport_c = transport.clone();
6879
6880 let fut = async move {
6881 let (mut stream, frame_kind) = if let Some(ref mp) = mtproxy_c {
6882 Self::open_stream_mtproxy(mp, dc_id).await?
6883 } else {
6884 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c, dc_id).await?
6885 };
6886
6887 let mut plain = Session::new();
6888
6889 let (req1, s1) =
6890 auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6891 send_frame(
6892 &mut stream,
6893 &plain.pack(&req1).to_plaintext_bytes(),
6894 &frame_kind,
6895 )
6896 .await?;
6897 let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
6898
6899 let (req2, s2) = auth::step2(s1, res_pq, dc_id as i32)
6900 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6901 send_frame(
6902 &mut stream,
6903 &plain.pack(&req2).to_plaintext_bytes(),
6904 &frame_kind,
6905 )
6906 .await?;
6907 let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
6908
6909 let (req3, s3) =
6910 auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6911 send_frame(
6912 &mut stream,
6913 &plain.pack(&req3).to_plaintext_bytes(),
6914 &frame_kind,
6915 )
6916 .await?;
6917 let ans: tl::enums::SetClientDhParamsAnswer =
6918 recv_frame_plain(&mut stream, &frame_kind).await?;
6919
6920 let done = {
6922 let mut result = auth::finish(s3, ans)
6923 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6924 let mut attempts = 0u8;
6925 loop {
6926 match result {
6927 auth::FinishResult::Done(d) => break d,
6928 auth::FinishResult::Retry {
6929 retry_id,
6930 dh_params,
6931 nonce,
6932 server_nonce,
6933 new_nonce,
6934 } => {
6935 attempts += 1;
6936 if attempts >= 5 {
6937 return Err(InvocationError::Deserialize(
6938 "dh_gen_retry exceeded 5 attempts".into(),
6939 ));
6940 }
6941 let (req_retry, s3_retry) = auth::retry_step3(
6942 &dh_params,
6943 nonce,
6944 server_nonce,
6945 new_nonce,
6946 retry_id,
6947 )
6948 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6949 send_frame(
6950 &mut stream,
6951 &plain.pack(&req_retry).to_plaintext_bytes(),
6952 &frame_kind,
6953 )
6954 .await?;
6955 let ans_retry: tl::enums::SetClientDhParamsAnswer =
6956 recv_frame_plain(&mut stream, &frame_kind).await?;
6957 result = auth::finish(s3_retry, ans_retry)
6958 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6959 }
6960 }
6961 }
6962 };
6963 tracing::debug!("[layer] DH complete ✓");
6964
6965 Ok::<Self, InvocationError>(Self {
6966 stream,
6967 enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
6968 frame_kind,
6969 })
6970 };
6971
6972 tokio::time::timeout(Duration::from_secs(15), fut)
6973 .await
6974 .map_err(|_| {
6975 InvocationError::Deserialize(format!(
6976 "DH handshake with {addr} timed out after 15 s"
6977 ))
6978 })?
6979 }
6980
6981 async fn connect_with_key(
6982 addr: &str,
6983 auth_key: [u8; 256],
6984 first_salt: i64,
6985 time_offset: i32,
6986 socks5: Option<&crate::socks5::Socks5Config>,
6987 transport: &TransportKind,
6988 dc_id: i16,
6989 ) -> Result<Self, InvocationError> {
6990 let addr2 = addr.to_string();
6991 let socks5_c = socks5.cloned();
6992 let transport_c = transport.clone();
6993
6994 let fut = async move {
6995 let (stream, frame_kind) =
6996 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c, dc_id).await?;
6997 Ok::<Self, InvocationError>(Self {
6998 stream,
6999 enc: EncryptedSession::new(auth_key, first_salt, time_offset),
7000 frame_kind,
7001 })
7002 };
7003
7004 tokio::time::timeout(Duration::from_secs(15), fut)
7005 .await
7006 .map_err(|_| {
7007 InvocationError::Deserialize(format!(
7008 "connect_with_key to {addr} timed out after 15 s"
7009 ))
7010 })?
7011 }
7012
7013 fn auth_key_bytes(&self) -> [u8; 256] {
7014 self.enc.auth_key_bytes()
7015 }
7016
7017 fn into_writer(self) -> (ConnectionWriter, OwnedWriteHalf, OwnedReadHalf, FrameKind) {
7019 let (read_half, write_half) = self.stream.into_split();
7020 let writer = ConnectionWriter {
7021 enc: self.enc,
7022 frame_kind: self.frame_kind.clone(),
7023 pending_ack: Vec::new(),
7024 sent_bodies: std::collections::HashMap::new(),
7025 container_map: std::collections::HashMap::new(),
7026 salts: Vec::new(),
7027 start_salt_time: None,
7028 };
7029 (writer, write_half, read_half, self.frame_kind)
7030 }
7031}
7032
7033async fn send_frame(
7037 stream: &mut TcpStream,
7038 data: &[u8],
7039 kind: &FrameKind,
7040) -> Result<(), InvocationError> {
7041 match kind {
7042 FrameKind::Abridged => send_abridged(stream, data).await,
7043 FrameKind::Intermediate => {
7044 let mut frame = Vec::with_capacity(4 + data.len());
7045 frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
7046 frame.extend_from_slice(data);
7047 stream.write_all(&frame).await?;
7048 Ok(())
7049 }
7050 FrameKind::Full { send_seqno, .. } => {
7051 let seq = send_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
7054 let total_len = (data.len() as u32) + 12;
7055 let mut packet = Vec::with_capacity(total_len as usize);
7056 packet.extend_from_slice(&total_len.to_le_bytes());
7057 packet.extend_from_slice(&seq.to_le_bytes());
7058 packet.extend_from_slice(data);
7059 let crc = crate::transport_intermediate::crc32_ieee(&packet);
7060 packet.extend_from_slice(&crc.to_le_bytes());
7061 stream.write_all(&packet).await?;
7062 Ok(())
7063 }
7064 FrameKind::Obfuscated { cipher } => {
7065 let words = data.len() / 4;
7067 let mut frame = if words < 0x7f {
7068 let mut v = Vec::with_capacity(1 + data.len());
7069 v.push(words as u8);
7070 v
7071 } else {
7072 let mut v = Vec::with_capacity(4 + data.len());
7073 v.extend_from_slice(&[
7074 0x7f,
7075 (words & 0xff) as u8,
7076 ((words >> 8) & 0xff) as u8,
7077 ((words >> 16) & 0xff) as u8,
7078 ]);
7079 v
7080 };
7081 frame.extend_from_slice(data);
7082 cipher.lock().await.encrypt(&mut frame);
7083 stream.write_all(&frame).await?;
7084 Ok(())
7085 }
7086 FrameKind::PaddedIntermediate { cipher } => {
7087 let mut pad_len_buf = [0u8; 1];
7089 getrandom::getrandom(&mut pad_len_buf).ok();
7090 let pad_len = (pad_len_buf[0] & 0x0f) as usize;
7091 let total_payload = data.len() + pad_len;
7092 let mut frame = Vec::with_capacity(4 + total_payload);
7093 frame.extend_from_slice(&(total_payload as u32).to_le_bytes());
7094 frame.extend_from_slice(data);
7095 let mut pad = vec![0u8; pad_len];
7096 getrandom::getrandom(&mut pad).ok();
7097 frame.extend_from_slice(&pad);
7098 cipher.lock().await.encrypt(&mut frame);
7099 stream.write_all(&frame).await?;
7100 Ok(())
7101 }
7102 FrameKind::FakeTls { cipher } => {
7103 const TLS_APP_DATA: u8 = 0x17;
7107 const TLS_VER: [u8; 2] = [0x03, 0x03];
7108 const CHUNK: usize = 2878;
7110 let mut locked = cipher.lock().await;
7111 for chunk in data.chunks(CHUNK) {
7112 let chunk_len = chunk.len() as u16;
7113 let mut record = Vec::with_capacity(5 + chunk.len());
7114 record.push(TLS_APP_DATA);
7115 record.extend_from_slice(&TLS_VER);
7116 record.extend_from_slice(&chunk_len.to_be_bytes());
7117 record.extend_from_slice(chunk);
7118 locked.encrypt(&mut record[5..]);
7120 stream.write_all(&record).await?;
7121 }
7122 Ok(())
7123 }
7124 }
7125}
7126
7127enum FrameOutcome {
7131 Frame(Vec<u8>),
7132 Error(InvocationError),
7133 Keepalive, }
7135
7136async fn recv_frame_with_keepalive(
7143 rh: &mut OwnedReadHalf,
7144 fk: &FrameKind,
7145 client: &Client,
7146 _ak: &[u8; 256],
7147) -> FrameOutcome {
7148 match tokio::time::timeout(
7149 Duration::from_secs(PING_DELAY_SECS),
7150 recv_frame_read(rh, fk),
7151 )
7152 .await
7153 {
7154 Ok(Ok(raw)) => FrameOutcome::Frame(raw),
7155 Ok(Err(e)) => FrameOutcome::Error(e),
7156 Err(_) => {
7157 let ping_req = tl::functions::PingDelayDisconnect {
7161 ping_id: random_i64(),
7162 disconnect_delay: NO_PING_DISCONNECT,
7163 };
7164 let (wire, fk) = {
7165 let mut w = client.inner.writer.lock().await;
7166 let fk = w.frame_kind.clone();
7167 (w.enc.pack(&ping_req), fk)
7168 };
7169 match send_frame_write(&mut *client.inner.write_half.lock().await, &wire, &fk).await {
7170 Ok(()) => FrameOutcome::Keepalive,
7171 Err(e) => FrameOutcome::Error(e),
7172 }
7173 }
7174 }
7175}
7176
7177async fn send_frame_write(
7184 stream: &mut OwnedWriteHalf,
7185 data: &[u8],
7186 kind: &FrameKind,
7187) -> Result<(), InvocationError> {
7188 match kind {
7189 FrameKind::Abridged => {
7190 let words = data.len() / 4;
7191 let mut frame = if words < 0x7f {
7193 let mut v = Vec::with_capacity(1 + data.len());
7194 v.push(words as u8);
7195 v
7196 } else {
7197 let mut v = Vec::with_capacity(4 + data.len());
7198 v.extend_from_slice(&[
7199 0x7f,
7200 (words & 0xff) as u8,
7201 ((words >> 8) & 0xff) as u8,
7202 ((words >> 16) & 0xff) as u8,
7203 ]);
7204 v
7205 };
7206 frame.extend_from_slice(data);
7207 stream.write_all(&frame).await?;
7208 Ok(())
7209 }
7210 FrameKind::Intermediate => {
7211 let mut frame = Vec::with_capacity(4 + data.len());
7212 frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
7213 frame.extend_from_slice(data);
7214 stream.write_all(&frame).await?;
7215 Ok(())
7216 }
7217 FrameKind::Full { send_seqno, .. } => {
7218 let seq = send_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
7220 let total_len = (data.len() as u32) + 12;
7221 let mut packet = Vec::with_capacity(total_len as usize);
7222 packet.extend_from_slice(&total_len.to_le_bytes());
7223 packet.extend_from_slice(&seq.to_le_bytes());
7224 packet.extend_from_slice(data);
7225 let crc = crate::transport_intermediate::crc32_ieee(&packet);
7226 packet.extend_from_slice(&crc.to_le_bytes());
7227 stream.write_all(&packet).await?;
7228 Ok(())
7229 }
7230 FrameKind::Obfuscated { cipher } => {
7231 let words = data.len() / 4;
7233 let mut frame = if words < 0x7f {
7234 let mut v = Vec::with_capacity(1 + data.len());
7235 v.push(words as u8);
7236 v
7237 } else {
7238 let mut v = Vec::with_capacity(4 + data.len());
7239 v.extend_from_slice(&[
7240 0x7f,
7241 (words & 0xff) as u8,
7242 ((words >> 8) & 0xff) as u8,
7243 ((words >> 16) & 0xff) as u8,
7244 ]);
7245 v
7246 };
7247 frame.extend_from_slice(data);
7248 cipher.lock().await.encrypt(&mut frame);
7249 stream.write_all(&frame).await?;
7250 Ok(())
7251 }
7252 FrameKind::PaddedIntermediate { cipher } => {
7253 let mut pad_len_buf = [0u8; 1];
7254 getrandom::getrandom(&mut pad_len_buf).ok();
7255 let pad_len = (pad_len_buf[0] & 0x0f) as usize;
7256 let total_payload = data.len() + pad_len;
7257 let mut frame = Vec::with_capacity(4 + total_payload);
7258 frame.extend_from_slice(&(total_payload as u32).to_le_bytes());
7259 frame.extend_from_slice(data);
7260 let mut pad = vec![0u8; pad_len];
7261 getrandom::getrandom(&mut pad).ok();
7262 frame.extend_from_slice(&pad);
7263 cipher.lock().await.encrypt(&mut frame);
7264 stream.write_all(&frame).await?;
7265 Ok(())
7266 }
7267 FrameKind::FakeTls { cipher } => {
7268 const TLS_APP_DATA: u8 = 0x17;
7269 const TLS_VER: [u8; 2] = [0x03, 0x03];
7270 const CHUNK: usize = 2878;
7271 let mut locked = cipher.lock().await;
7272 for chunk in data.chunks(CHUNK) {
7273 let chunk_len = chunk.len() as u16;
7274 let mut record = Vec::with_capacity(5 + chunk.len());
7275 record.push(TLS_APP_DATA);
7276 record.extend_from_slice(&TLS_VER);
7277 record.extend_from_slice(&chunk_len.to_be_bytes());
7278 record.extend_from_slice(chunk);
7279 locked.encrypt(&mut record[5..]);
7280 stream.write_all(&record).await?;
7281 }
7282 Ok(())
7283 }
7284 }
7285}
7286
7287async fn recv_frame_read(
7289 stream: &mut OwnedReadHalf,
7290 kind: &FrameKind,
7291) -> Result<Vec<u8>, InvocationError> {
7292 match kind {
7293 FrameKind::Abridged => {
7294 let mut h = [0u8; 1];
7295 stream.read_exact(&mut h).await?;
7296 let words = if h[0] < 0x7f {
7297 h[0] as usize
7298 } else {
7299 let mut b = [0u8; 3];
7300 stream.read_exact(&mut b).await?;
7301 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
7302 };
7303 let len = words * 4;
7304 let mut buf = vec![0u8; len];
7305 stream.read_exact(&mut buf).await?;
7306 if buf.len() == 4 {
7307 let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
7308 if code < 0 {
7309 return Err(InvocationError::Rpc(RpcError::from_telegram(
7310 code,
7311 "transport error",
7312 )));
7313 }
7314 }
7315 Ok(buf)
7316 }
7317 FrameKind::Intermediate => {
7318 let mut len_buf = [0u8; 4];
7319 stream.read_exact(&mut len_buf).await?;
7320 let len_i32 = i32::from_le_bytes(len_buf);
7321 if len_i32 < 0 {
7329 return Err(InvocationError::Rpc(RpcError::from_telegram(
7330 len_i32,
7331 "transport error",
7332 )));
7333 }
7334 let len = len_i32 as usize;
7335 let mut buf = vec![0u8; len];
7336 stream.read_exact(&mut buf).await?;
7337 Ok(buf)
7338 }
7339 FrameKind::Full { recv_seqno, .. } => {
7340 let mut len_buf = [0u8; 4];
7341 stream.read_exact(&mut len_buf).await?;
7342 let total_len_i32 = i32::from_le_bytes(len_buf);
7343 if total_len_i32 < 0 {
7344 return Err(InvocationError::Rpc(RpcError::from_telegram(
7345 total_len_i32,
7346 "transport error",
7347 )));
7348 }
7349 let total_len = total_len_i32 as usize;
7350 if total_len < 12 {
7351 return Err(InvocationError::Deserialize(
7352 "Full transport: packet too short".into(),
7353 ));
7354 }
7355 let mut rest = vec![0u8; total_len - 4];
7356 stream.read_exact(&mut rest).await?;
7357 let (body, crc_bytes) = rest.split_at(rest.len() - 4);
7358 let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
7359 let mut check_input = Vec::with_capacity(4 + body.len());
7360 check_input.extend_from_slice(&len_buf);
7361 check_input.extend_from_slice(body);
7362 let actual_crc = crate::transport_intermediate::crc32_ieee(&check_input);
7363 if actual_crc != expected_crc {
7364 return Err(InvocationError::Deserialize(format!(
7365 "Full transport: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
7366 )));
7367 }
7368 let recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
7369 let expected_seq = recv_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
7370 if recv_seq != expected_seq {
7371 return Err(InvocationError::Deserialize(format!(
7372 "Full transport: seqno mismatch (got {recv_seq}, expected {expected_seq})"
7373 )));
7374 }
7375 Ok(body[4..].to_vec())
7376 }
7377 FrameKind::Obfuscated { cipher } => {
7378 let mut h = [0u8; 1];
7379 stream.read_exact(&mut h).await?;
7380 cipher.lock().await.decrypt(&mut h);
7381 let words = if h[0] < 0x7f {
7382 h[0] as usize
7383 } else {
7384 let mut b = [0u8; 3];
7385 stream.read_exact(&mut b).await?;
7386 cipher.lock().await.decrypt(&mut b);
7387 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
7388 };
7389 let mut buf = vec![0u8; words * 4];
7390 stream.read_exact(&mut buf).await?;
7391 cipher.lock().await.decrypt(&mut buf);
7392 if buf.len() == 4 {
7393 let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
7394 if code < 0 {
7395 return Err(InvocationError::Rpc(RpcError::from_telegram(
7396 code,
7397 "transport error",
7398 )));
7399 }
7400 }
7401 Ok(buf)
7402 }
7403 FrameKind::PaddedIntermediate { cipher } => {
7404 let mut len_buf = [0u8; 4];
7406 stream.read_exact(&mut len_buf).await?;
7407 cipher.lock().await.decrypt(&mut len_buf);
7408 let total_len = i32::from_le_bytes(len_buf);
7409 if total_len < 0 {
7410 return Err(InvocationError::Rpc(RpcError::from_telegram(
7411 total_len,
7412 "transport error",
7413 )));
7414 }
7415 let mut buf = vec![0u8; total_len as usize];
7416 stream.read_exact(&mut buf).await?;
7417 cipher.lock().await.decrypt(&mut buf);
7418 Ok(buf)
7422 }
7423 FrameKind::FakeTls { cipher } => {
7424 let mut hdr = [0u8; 5];
7426 stream.read_exact(&mut hdr).await?;
7427 if hdr[0] != 0x17 {
7428 return Err(InvocationError::Deserialize(format!(
7429 "FakeTLS: unexpected record type 0x{:02x}",
7430 hdr[0]
7431 )));
7432 }
7433 let payload_len = u16::from_be_bytes([hdr[3], hdr[4]]) as usize;
7434 let mut buf = vec![0u8; payload_len];
7435 stream.read_exact(&mut buf).await?;
7436 cipher.lock().await.decrypt(&mut buf);
7437 Ok(buf)
7438 }
7439 }
7440}
7441
7442async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
7444 let words = data.len() / 4;
7445 let mut frame = if words < 0x7f {
7447 let mut v = Vec::with_capacity(1 + data.len());
7448 v.push(words as u8);
7449 v
7450 } else {
7451 let mut v = Vec::with_capacity(4 + data.len());
7452 v.extend_from_slice(&[
7453 0x7f,
7454 (words & 0xff) as u8,
7455 ((words >> 8) & 0xff) as u8,
7456 ((words >> 16) & 0xff) as u8,
7457 ]);
7458 v
7459 };
7460 frame.extend_from_slice(data);
7461 stream.write_all(&frame).await?;
7462 Ok(())
7463}
7464
7465async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
7466 let mut h = [0u8; 1];
7467 stream.read_exact(&mut h).await?;
7468 let words = if h[0] < 0x7f {
7469 h[0] as usize
7470 } else {
7471 let mut b = [0u8; 3];
7472 stream.read_exact(&mut b).await?;
7473 let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
7474 if w == 1 {
7476 let mut code_buf = [0u8; 4];
7477 stream.read_exact(&mut code_buf).await?;
7478 let code = i32::from_le_bytes(code_buf);
7479 return Err(InvocationError::Rpc(RpcError::from_telegram(
7480 code,
7481 "transport error",
7482 )));
7483 }
7484 w
7485 };
7486 if words == 0 || words > 0x8000 {
7489 return Err(InvocationError::Deserialize(format!(
7490 "abridged: implausible word count {words} (possible transport error or framing mismatch)"
7491 )));
7492 }
7493 let mut buf = vec![0u8; words * 4];
7494 stream.read_exact(&mut buf).await?;
7495 Ok(buf)
7496}
7497
7498async fn recv_frame_plain<T: Deserializable>(
7500 stream: &mut TcpStream,
7501 kind: &FrameKind,
7502) -> Result<T, InvocationError> {
7503 let raw = match kind {
7510 FrameKind::Abridged => recv_abridged(stream).await?,
7511 FrameKind::Intermediate => {
7512 let mut len_buf = [0u8; 4];
7513 stream.read_exact(&mut len_buf).await?;
7514 let len = u32::from_le_bytes(len_buf) as usize;
7515 if len == 0 || len > 1 << 24 {
7516 return Err(InvocationError::Deserialize(format!(
7517 "plaintext frame: implausible length {len}"
7518 )));
7519 }
7520 let mut buf = vec![0u8; len];
7521 stream.read_exact(&mut buf).await?;
7522 buf
7523 }
7524 FrameKind::Full { recv_seqno, .. } => {
7525 let mut len_buf = [0u8; 4];
7527 stream.read_exact(&mut len_buf).await?;
7528 let total_len = u32::from_le_bytes(len_buf) as usize;
7529 if !(12..=(1 << 24) + 12).contains(&total_len) {
7530 return Err(InvocationError::Deserialize(format!(
7531 "Full plaintext frame: implausible total_len {total_len}"
7532 )));
7533 }
7534 let mut rest = vec![0u8; total_len - 4];
7535 stream.read_exact(&mut rest).await?;
7536
7537 let (body, crc_bytes) = rest.split_at(rest.len() - 4);
7539 let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
7540 let mut check_input = Vec::with_capacity(4 + body.len());
7541 check_input.extend_from_slice(&len_buf);
7542 check_input.extend_from_slice(body);
7543 let actual_crc = crate::transport_intermediate::crc32_ieee(&check_input);
7544 if actual_crc != expected_crc {
7545 return Err(InvocationError::Deserialize(format!(
7546 "Full plaintext: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
7547 )));
7548 }
7549
7550 let recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
7552 let expected_seq = recv_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
7553 if recv_seq != expected_seq {
7554 return Err(InvocationError::Deserialize(format!(
7555 "Full plaintext: seqno mismatch (got {recv_seq}, expected {expected_seq})"
7556 )));
7557 }
7558
7559 body[4..].to_vec()
7560 }
7561 FrameKind::Obfuscated { cipher } => {
7562 let mut h = [0u8; 1];
7564 stream.read_exact(&mut h).await?;
7565 cipher.lock().await.decrypt(&mut h);
7566 let words = if h[0] < 0x7f {
7567 h[0] as usize
7568 } else {
7569 let mut b = [0u8; 3];
7570 stream.read_exact(&mut b).await?;
7571 cipher.lock().await.decrypt(&mut b);
7572 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
7573 };
7574 let mut buf = vec![0u8; words * 4];
7575 stream.read_exact(&mut buf).await?;
7576 cipher.lock().await.decrypt(&mut buf);
7577 buf
7578 }
7579 FrameKind::PaddedIntermediate { cipher } => {
7580 let mut len_buf = [0u8; 4];
7581 stream.read_exact(&mut len_buf).await?;
7582 cipher.lock().await.decrypt(&mut len_buf);
7583 let len = u32::from_le_bytes(len_buf) as usize;
7584 if len == 0 || len > 1 << 24 {
7585 return Err(InvocationError::Deserialize(format!(
7586 "PaddedIntermediate plaintext: implausible length {len}"
7587 )));
7588 }
7589 let mut buf = vec![0u8; len];
7590 stream.read_exact(&mut buf).await?;
7591 cipher.lock().await.decrypt(&mut buf);
7592 buf
7593 }
7594 FrameKind::FakeTls { cipher } => {
7595 let mut hdr = [0u8; 5];
7596 stream.read_exact(&mut hdr).await?;
7597 if hdr[0] != 0x17 {
7598 return Err(InvocationError::Deserialize(format!(
7599 "FakeTLS plaintext: unexpected record type 0x{:02x}",
7600 hdr[0]
7601 )));
7602 }
7603 let payload_len = u16::from_be_bytes([hdr[3], hdr[4]]) as usize;
7604 let mut buf = vec![0u8; payload_len];
7605 stream.read_exact(&mut buf).await?;
7606 cipher.lock().await.decrypt(&mut buf);
7607 buf
7608 }
7609 };
7610 if raw.len() < 20 {
7611 return Err(InvocationError::Deserialize(
7612 "plaintext frame too short".into(),
7613 ));
7614 }
7615 if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
7616 return Err(InvocationError::Deserialize(
7617 "expected auth_key_id=0 in plaintext".into(),
7618 ));
7619 }
7620 let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
7621 if 20 + body_len > raw.len() {
7622 return Err(InvocationError::Deserialize(
7623 "plaintext frame: body_len exceeds frame size".into(),
7624 ));
7625 }
7626 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
7627 T::deserialize(&mut cur).map_err(Into::into)
7628}
7629
7630enum EnvelopeResult {
7633 Payload(Vec<u8>),
7634 RawUpdates(Vec<Vec<u8>>),
7636 Pts(i32, i32),
7638 None,
7639}
7640
7641fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
7642 if body.len() < 4 {
7643 return Err(InvocationError::Deserialize("body < 4 bytes".into()));
7644 }
7645 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
7646
7647 match cid {
7648 ID_RPC_RESULT => {
7649 if body.len() < 12 {
7650 return Err(InvocationError::Deserialize("rpc_result too short".into()));
7651 }
7652 unwrap_envelope(body[12..].to_vec())
7653 }
7654 ID_RPC_ERROR => {
7655 if body.len() < 8 {
7656 return Err(InvocationError::Deserialize("rpc_error too short".into()));
7657 }
7658 let code = i32::from_le_bytes(body[4..8].try_into().unwrap());
7659 let message = tl_read_string(&body[8..]).unwrap_or_default();
7660 Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
7661 }
7662 ID_MSG_CONTAINER => {
7663 if body.len() < 8 {
7664 return Err(InvocationError::Deserialize("container too short".into()));
7665 }
7666 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
7667 let mut pos = 8usize;
7668 let mut payload: Option<Vec<u8>> = None;
7669 let mut raw_updates: Vec<Vec<u8>> = Vec::new();
7670
7671 for _ in 0..count {
7672 if pos + 16 > body.len() { break; }
7673 let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
7674 pos += 16;
7675 if pos + inner_len > body.len() { break; }
7676 let inner = body[pos..pos + inner_len].to_vec();
7677 pos += inner_len;
7678 match unwrap_envelope(inner)? {
7679 EnvelopeResult::Payload(p) => { payload = Some(p); }
7680 EnvelopeResult::RawUpdates(mut raws) => { raw_updates.append(&mut raws); }
7681 EnvelopeResult::Pts(_, _) => {} EnvelopeResult::None => {}
7683 }
7684 }
7685 if let Some(p) = payload {
7686 Ok(EnvelopeResult::Payload(p))
7687 } else if !raw_updates.is_empty() {
7688 Ok(EnvelopeResult::RawUpdates(raw_updates))
7689 } else {
7690 Ok(EnvelopeResult::None)
7691 }
7692 }
7693 ID_GZIP_PACKED => {
7694 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
7695 unwrap_envelope(gz_inflate(&bytes)?)
7696 }
7697 ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY
7702 | 0xd33b5459 | 0x04deb57d | 0x8cc0d131 | 0x276d3ec6 | 0x809db6df | 0x7d861a08 | 0x0949d9dc | 0xae500895 | 0x9299359f | 0xe22045fc | 0x62d350c9 => {
7715 Ok(EnvelopeResult::None)
7716 }
7717 ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
7723 | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
7724 | ID_UPDATES_TOO_LONG => {
7725 Ok(EnvelopeResult::RawUpdates(vec![body]))
7726 }
7727 ID_UPDATE_SHORT_SENT_MSG => {
7734 let mut cur = Cursor::from_slice(&body[4..]);
7735 match tl::types::UpdateShortSentMessage::deserialize(&mut cur) {
7736 Ok(m) => {
7737 tracing::debug!(
7738 "[layer] updateShortSentMessage (RPC): pts={} pts_count={}: advancing pts",
7739 m.pts, m.pts_count
7740 );
7741 Ok(EnvelopeResult::Pts(m.pts, m.pts_count))
7742 }
7743 Err(e) => {
7744 tracing::debug!("[layer] updateShortSentMessage deserialize error: {e}");
7745 Ok(EnvelopeResult::None)
7746 }
7747 }
7748 }
7749 _ => Ok(EnvelopeResult::Payload(body)),
7750 }
7751}
7752
7753fn random_i64() -> i64 {
7756 let mut b = [0u8; 8];
7757 getrandom::getrandom(&mut b).expect("getrandom");
7758 i64::from_le_bytes(b)
7759}
7760
7761fn jitter_delay(base_ms: u64) -> Duration {
7765 let mut b = [0u8; 2];
7767 getrandom::getrandom(&mut b).unwrap_or(());
7768 let rand_frac = u16::from_le_bytes(b) as f64 / 65535.0; let factor = 0.80 + rand_frac * 0.40; Duration::from_millis((base_ms as f64 * factor) as u64)
7771}
7772
7773pub(crate) fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
7774 if data.is_empty() {
7775 return Some(vec![]);
7776 }
7777 let (len, start) = if data[0] < 254 {
7778 (data[0] as usize, 1)
7779 } else if data.len() >= 4 {
7780 (
7781 data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16,
7782 4,
7783 )
7784 } else {
7785 return None;
7786 };
7787 if data.len() < start + len {
7788 return None;
7789 }
7790 Some(data[start..start + len].to_vec())
7791}
7792
7793fn tl_read_string(data: &[u8]) -> Option<String> {
7794 tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
7795}
7796
7797pub(crate) fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
7798 use std::io::Read;
7799 let mut out = Vec::new();
7800 if flate2::read::GzDecoder::new(data)
7801 .read_to_end(&mut out)
7802 .is_ok()
7803 && !out.is_empty()
7804 {
7805 return Ok(out);
7806 }
7807 out.clear();
7808 flate2::read::ZlibDecoder::new(data)
7809 .read_to_end(&mut out)
7810 .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
7811 Ok(out)
7812}
7813
7814pub(crate) fn maybe_gz_decompress(body: Vec<u8>) -> Result<Vec<u8>, InvocationError> {
7815 const ID_GZIP_PACKED_LOCAL: u32 = 0x3072cfa1;
7816 if body.len() >= 4 && u32::from_le_bytes(body[0..4].try_into().unwrap()) == ID_GZIP_PACKED_LOCAL
7817 {
7818 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
7819 gz_inflate(&bytes)
7820 } else {
7821 Ok(body)
7822 }
7823}
7824
7825const COMPRESSION_THRESHOLD: usize = 512;
7830
7831fn tl_write_bytes(data: &[u8]) -> Vec<u8> {
7833 let len = data.len();
7834 let mut out = Vec::with_capacity(4 + len);
7835 if len < 254 {
7836 out.push(len as u8);
7837 out.extend_from_slice(data);
7838 let pad = (4 - (1 + len) % 4) % 4;
7839 out.extend(std::iter::repeat_n(0u8, pad));
7840 } else {
7841 out.push(0xfe);
7842 out.extend_from_slice(&(len as u32).to_le_bytes()[..3]);
7843 out.extend_from_slice(data);
7844 let pad = (4 - (4 + len) % 4) % 4;
7845 out.extend(std::iter::repeat_n(0u8, pad));
7846 }
7847 out
7848}
7849
7850fn gz_pack_body(data: &[u8]) -> Vec<u8> {
7852 use std::io::Write;
7853 let mut enc = flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default());
7854 let _ = enc.write_all(data);
7855 let compressed = enc.finish().unwrap_or_default();
7856 let mut out = Vec::with_capacity(4 + 4 + compressed.len());
7857 out.extend_from_slice(&ID_GZIP_PACKED.to_le_bytes());
7858 out.extend(tl_write_bytes(&compressed));
7859 out
7860}
7861
7862fn maybe_gz_pack(data: &[u8]) -> Vec<u8> {
7865 if data.len() <= COMPRESSION_THRESHOLD {
7866 return data.to_vec();
7867 }
7868 let packed = gz_pack_body(data);
7869 if packed.len() < data.len() {
7870 packed
7871 } else {
7872 data.to_vec()
7873 }
7874}
7875
7876fn build_msgs_ack_body(msg_ids: &[i64]) -> Vec<u8> {
7880 let mut out = Vec::with_capacity(4 + 4 + 4 + msg_ids.len() * 8);
7883 out.extend_from_slice(&ID_MSGS_ACK.to_le_bytes());
7884 out.extend_from_slice(&0x1cb5c415_u32.to_le_bytes()); out.extend_from_slice(&(msg_ids.len() as u32).to_le_bytes());
7886 for &id in msg_ids {
7887 out.extend_from_slice(&id.to_le_bytes());
7888 }
7889 out
7890}
7891
7892fn build_container_body(messages: &[(i64, i32, &[u8])]) -> Vec<u8> {
7900 let total_body: usize = messages.iter().map(|(_, _, b)| 16 + b.len()).sum();
7901 let mut out = Vec::with_capacity(8 + total_body);
7902 out.extend_from_slice(&ID_MSG_CONTAINER.to_le_bytes());
7903 out.extend_from_slice(&(messages.len() as u32).to_le_bytes());
7904 for &(msg_id, seqno, body) in messages {
7905 out.extend_from_slice(&msg_id.to_le_bytes());
7906 out.extend_from_slice(&seqno.to_le_bytes());
7907 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
7908 out.extend_from_slice(body);
7909 }
7910 out
7911}