1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc(html_root_url = "https://docs.rs/layer-client/0.4.6")]
3#![deny(unsafe_code)]
23
24pub mod builder;
25mod errors;
26pub mod media;
27pub mod parsers;
28pub mod participants;
29pub mod pts;
30mod restart;
31mod retry;
32mod session;
33mod transport;
34mod two_factor_auth;
35pub mod update;
36
37pub mod dc_pool;
38pub mod inline_iter;
39pub mod keyboard;
40pub mod search;
41pub mod session_backend;
42pub mod socks5;
43pub mod transport_intermediate;
44pub mod transport_obfuscated;
45pub mod types;
46pub mod typing_guard;
47
48#[macro_use]
49pub mod macros;
50pub mod peer_ref;
51pub mod reactions;
52
53#[cfg(test)]
54mod pts_tests;
55
56pub mod dc_migration;
57pub mod proxy;
58
59pub use builder::{BuilderError, ClientBuilder};
60pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
61pub use keyboard::{Button, InlineKeyboard, ReplyKeyboard};
62pub use media::{Document, DownloadIter, Downloadable, Photo, Sticker, UploadedFile};
63pub use participants::{Participant, ProfilePhotoIter};
64pub use peer_ref::PeerRef;
65pub use proxy::{MtProxyConfig, parse_proxy_link};
66pub use restart::{ConnectionRestartPolicy, FixedInterval, NeverRestart};
67use retry::RetryLoop;
68pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
69pub use search::{GlobalSearchBuilder, SearchBuilder};
70pub use session::{DcEntry, DcFlags};
71#[cfg(feature = "libsql-session")]
72#[cfg_attr(docsrs, doc(cfg(feature = "libsql-session")))]
73pub use session_backend::LibSqlBackend;
74#[cfg(feature = "sqlite-session")]
75#[cfg_attr(docsrs, doc(cfg(feature = "sqlite-session")))]
76pub use session_backend::SqliteBackend;
77pub use session_backend::{
78 BinaryFileBackend, InMemoryBackend, SessionBackend, StringSessionBackend, UpdateStateChange,
79};
80pub use socks5::Socks5Config;
81pub use types::ChannelKind;
82pub use types::{Channel, Chat, Group, User};
83pub use typing_guard::TypingGuard;
84pub use update::Update;
85pub use update::{ChatActionUpdate, UserStatusUpdate};
86
87pub use layer_tl_types as tl;
90
91use std::collections::HashMap;
92use std::collections::VecDeque;
93use std::num::NonZeroU32;
94use std::ops::ControlFlow;
95use std::sync::Arc;
96use std::time::Duration;
97
98use layer_mtproto::{EncryptedSession, Session, authentication as auth};
99use layer_tl_types::{Cursor, Deserializable, RemoteCall};
100use session::PersistedSession;
101use socket2::TcpKeepalive;
102use tokio::io::{AsyncReadExt, AsyncWriteExt};
103use tokio::net::TcpStream;
104use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
105use tokio::sync::{Mutex, RwLock, mpsc, oneshot};
106use tokio::time::sleep;
107use tokio_util::sync::CancellationToken;
108
109const ID_RPC_RESULT: u32 = 0xf35c6d01;
112const ID_RPC_ERROR: u32 = 0x2144ca19;
113const ID_MSG_CONTAINER: u32 = 0x73f1f8dc;
114const ID_GZIP_PACKED: u32 = 0x3072cfa1;
115const ID_PONG: u32 = 0x347773c5;
116const ID_MSGS_ACK: u32 = 0x62d6b459;
117const ID_BAD_SERVER_SALT: u32 = 0xedab447b;
118const ID_NEW_SESSION: u32 = 0x9ec20908;
119const ID_BAD_MSG_NOTIFY: u32 = 0xa7eff811;
120const ID_FUTURE_SALTS: u32 = 0xae500895;
122const ID_MSG_DETAILED_INFO: u32 = 0x276d3ec6;
124const ID_MSG_NEW_DETAIL_INFO: u32 = 0x809db6df;
125const ID_MSG_RESEND_REQ: u32 = 0x7d861a08;
127const ID_UPDATES: u32 = 0x74ae4240;
128const ID_UPDATE_SHORT: u32 = 0x78d4dec1;
129const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
130const ID_UPDATE_SHORT_MSG: u32 = 0x313bc7f8;
131const ID_UPDATE_SHORT_CHAT_MSG: u32 = 0x4d6deea5;
132const ID_UPDATE_SHORT_SENT_MSG: u32 = 0x9015e101;
133const ID_UPDATES_TOO_LONG: u32 = 0xe317af7e;
134
135const PING_DELAY_SECS: u64 = 60;
141
142const NO_PING_DISCONNECT: i32 = 75;
146
147const RECONNECT_BASE_MS: u64 = 500;
149
150const RECONNECT_MAX_SECS: u64 = 5;
155
156const TCP_KEEPALIVE_IDLE_SECS: u64 = 10;
158const TCP_KEEPALIVE_INTERVAL_SECS: u64 = 5;
160const TCP_KEEPALIVE_PROBES: u32 = 3;
162
163#[derive(Default)]
171pub struct PeerCache {
172 pub users: HashMap<i64, i64>,
174 pub channels: HashMap<i64, i64>,
176}
177
178impl PeerCache {
179 fn cache_user(&mut self, user: &tl::enums::User) {
180 if let tl::enums::User::User(u) = user
181 && let Some(hash) = u.access_hash
182 {
183 self.users.insert(u.id, hash);
184 }
185 }
186
187 fn cache_chat(&mut self, chat: &tl::enums::Chat) {
188 match chat {
189 tl::enums::Chat::Channel(c) => {
190 if let Some(hash) = c.access_hash {
191 self.channels.insert(c.id, hash);
192 }
193 }
194 tl::enums::Chat::ChannelForbidden(c) => {
195 self.channels.insert(c.id, c.access_hash);
196 }
197 _ => {}
198 }
199 }
200
201 fn cache_users(&mut self, users: &[tl::enums::User]) {
202 for u in users {
203 self.cache_user(u);
204 }
205 }
206
207 fn cache_chats(&mut self, chats: &[tl::enums::Chat]) {
208 for c in chats {
209 self.cache_chat(c);
210 }
211 }
212
213 fn user_input_peer(&self, user_id: i64) -> tl::enums::InputPeer {
214 if user_id == 0 {
215 return tl::enums::InputPeer::PeerSelf;
216 }
217 let hash = self.users.get(&user_id).copied().unwrap_or_else(|| {
218 tracing::warn!("[layer] PeerCache: no access_hash for user {user_id}, using 0: may cause USER_ID_INVALID");
219 0
220 });
221 tl::enums::InputPeer::User(tl::types::InputPeerUser {
222 user_id,
223 access_hash: hash,
224 })
225 }
226
227 fn channel_input_peer(&self, channel_id: i64) -> tl::enums::InputPeer {
228 let hash = self.channels.get(&channel_id).copied().unwrap_or_else(|| {
229 tracing::warn!("[layer] PeerCache: no access_hash for channel {channel_id}, using 0: may cause CHANNEL_INVALID");
230 0
231 });
232 tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
233 channel_id,
234 access_hash: hash,
235 })
236 }
237
238 fn peer_to_input(&self, peer: &tl::enums::Peer) -> tl::enums::InputPeer {
239 match peer {
240 tl::enums::Peer::User(u) => self.user_input_peer(u.user_id),
241 tl::enums::Peer::Chat(c) => {
242 tl::enums::InputPeer::Chat(tl::types::InputPeerChat { chat_id: c.chat_id })
243 }
244 tl::enums::Peer::Channel(c) => self.channel_input_peer(c.channel_id),
245 }
246 }
247}
248
249#[derive(Clone, Default)]
261pub struct InputMessage {
262 pub text: String,
263 pub reply_to: Option<i32>,
264 pub silent: bool,
265 pub background: bool,
266 pub clear_draft: bool,
267 pub no_webpage: bool,
268 pub invert_media: bool,
270 pub schedule_once_online: bool,
272 pub entities: Option<Vec<tl::enums::MessageEntity>>,
273 pub reply_markup: Option<tl::enums::ReplyMarkup>,
274 pub schedule_date: Option<i32>,
275 pub media: Option<tl::enums::InputMedia>,
278}
279
280impl InputMessage {
281 pub fn text(text: impl Into<String>) -> Self {
283 Self {
284 text: text.into(),
285 ..Default::default()
286 }
287 }
288
289 pub fn set_text(mut self, text: impl Into<String>) -> Self {
291 self.text = text.into();
292 self
293 }
294
295 pub fn reply_to(mut self, id: Option<i32>) -> Self {
297 self.reply_to = id;
298 self
299 }
300
301 pub fn silent(mut self, v: bool) -> Self {
303 self.silent = v;
304 self
305 }
306
307 pub fn background(mut self, v: bool) -> Self {
309 self.background = v;
310 self
311 }
312
313 pub fn clear_draft(mut self, v: bool) -> Self {
315 self.clear_draft = v;
316 self
317 }
318
319 pub fn no_webpage(mut self, v: bool) -> Self {
321 self.no_webpage = v;
322 self
323 }
324
325 pub fn invert_media(mut self, v: bool) -> Self {
327 self.invert_media = v;
328 self
329 }
330
331 pub fn schedule_once_online(mut self) -> Self {
336 self.schedule_once_online = true;
337 self.schedule_date = None;
338 self
339 }
340
341 pub fn entities(mut self, e: Vec<tl::enums::MessageEntity>) -> Self {
343 self.entities = Some(e);
344 self
345 }
346
347 pub fn reply_markup(mut self, rm: tl::enums::ReplyMarkup) -> Self {
349 self.reply_markup = Some(rm);
350 self
351 }
352
353 pub fn keyboard(mut self, kb: impl Into<tl::enums::ReplyMarkup>) -> Self {
363 self.reply_markup = Some(kb.into());
364 self
365 }
366
367 pub fn schedule_date(mut self, ts: Option<i32>) -> Self {
369 self.schedule_date = ts;
370 self
371 }
372
373 pub fn copy_media(mut self, media: tl::enums::InputMedia) -> Self {
389 self.media = Some(media);
390 self
391 }
392
393 pub fn clear_media(mut self) -> Self {
395 self.media = None;
396 self
397 }
398
399 fn reply_header(&self) -> Option<tl::enums::InputReplyTo> {
400 self.reply_to.map(|id| {
401 tl::enums::InputReplyTo::Message(tl::types::InputReplyToMessage {
402 reply_to_msg_id: id,
403 top_msg_id: None,
404 reply_to_peer_id: None,
405 quote_text: None,
406 quote_entities: None,
407 quote_offset: None,
408 monoforum_peer_id: None,
409 todo_item_id: None,
410 poll_option: None,
411 })
412 })
413 }
414}
415
416impl From<&str> for InputMessage {
417 fn from(s: &str) -> Self {
418 Self::text(s)
419 }
420}
421
422impl From<String> for InputMessage {
423 fn from(s: String) -> Self {
424 Self::text(s)
425 }
426}
427
428#[derive(Clone, Debug)]
441pub enum TransportKind {
442 Abridged,
446 Intermediate,
450 Full,
454 Obfuscated { secret: Option<[u8; 16]> },
462 PaddedIntermediate { secret: Option<[u8; 16]> },
468 FakeTls { secret: [u8; 16], domain: String },
475}
476
477impl Default for TransportKind {
478 fn default() -> Self {
479 TransportKind::Obfuscated { secret: None }
485 }
486}
487
488pub type ShutdownToken = CancellationToken;
508
509#[derive(Clone)]
511pub struct Config {
512 pub api_id: i32,
513 pub api_hash: String,
514 pub dc_addr: Option<String>,
515 pub retry_policy: Arc<dyn RetryPolicy>,
516 pub socks5: Option<crate::socks5::Socks5Config>,
518 pub mtproxy: Option<crate::proxy::MtProxyConfig>,
522 pub allow_ipv6: bool,
524 pub transport: TransportKind,
526 pub session_backend: Arc<dyn crate::session_backend::SessionBackend>,
528 pub catch_up: bool,
532 pub restart_policy: Arc<dyn ConnectionRestartPolicy>,
533}
534
535impl Config {
536 pub fn with_string_session(s: impl Into<String>) -> Self {
551 Config {
552 session_backend: Arc::new(crate::session_backend::StringSessionBackend::new(s)),
553 ..Config::default()
554 }
555 }
556
557 pub fn proxy_link(mut self, url: &str) -> Self {
575 if url.is_empty() {
576 return self;
577 }
578 let cfg = crate::proxy::parse_proxy_link(url)
579 .unwrap_or_else(|| panic!("invalid MTProxy link: {url:?}"));
580 self.mtproxy = Some(cfg);
581 self
582 }
583
584 pub fn proxy(self, host: impl Into<String>, port: u16, secret: &str) -> Self {
601 let host = host.into();
602 let url = format!("tg://proxy?server={host}&port={port}&secret={secret}");
603 self.proxy_link(&url)
604 }
605
606 pub fn socks5(mut self, addr: impl Into<String>) -> Self {
619 self.socks5 = Some(crate::socks5::Socks5Config::new(addr));
620 self
621 }
622
623 pub fn socks5_auth(
636 mut self,
637 addr: impl Into<String>,
638 username: impl Into<String>,
639 password: impl Into<String>,
640 ) -> Self {
641 self.socks5 = Some(crate::socks5::Socks5Config::with_auth(
642 addr, username, password,
643 ));
644 self
645 }
646}
647
648impl Default for Config {
649 fn default() -> Self {
650 Self {
651 api_id: 0,
652 api_hash: String::new(),
653 dc_addr: None,
654 retry_policy: Arc::new(AutoSleep::default()),
655 socks5: None,
656 mtproxy: None,
657 allow_ipv6: false,
658 transport: TransportKind::Obfuscated { secret: None },
659 session_backend: Arc::new(crate::session_backend::BinaryFileBackend::new(
660 "layer.session",
661 )),
662 catch_up: false,
663 restart_policy: Arc::new(NeverRestart),
664 }
665 }
666}
667
668pub struct UpdateStream {
673 rx: mpsc::UnboundedReceiver<update::Update>,
674}
675
676impl UpdateStream {
677 pub async fn next(&mut self) -> Option<update::Update> {
679 self.rx.recv().await
680 }
681
682 pub async fn next_raw(&mut self) -> Option<update::RawUpdate> {
688 loop {
689 match self.rx.recv().await? {
690 update::Update::Raw(r) => return Some(r),
691 _ => continue,
692 }
693 }
694 }
695}
696
697#[derive(Debug, Clone)]
701pub struct Dialog {
702 pub raw: tl::enums::Dialog,
703 pub message: Option<tl::enums::Message>,
704 pub entity: Option<tl::enums::User>,
705 pub chat: Option<tl::enums::Chat>,
706}
707
708impl Dialog {
709 pub fn title(&self) -> String {
711 if let Some(tl::enums::User::User(u)) = &self.entity {
712 let first = u.first_name.as_deref().unwrap_or("");
713 let last = u.last_name.as_deref().unwrap_or("");
714 let name = format!("{first} {last}").trim().to_string();
715 if !name.is_empty() {
716 return name;
717 }
718 }
719 if let Some(chat) = &self.chat {
720 return match chat {
721 tl::enums::Chat::Chat(c) => c.title.clone(),
722 tl::enums::Chat::Forbidden(c) => c.title.clone(),
723 tl::enums::Chat::Channel(c) => c.title.clone(),
724 tl::enums::Chat::ChannelForbidden(c) => c.title.clone(),
725 tl::enums::Chat::Empty(_) => "(empty)".into(),
726 };
727 }
728 "(Unknown)".to_string()
729 }
730
731 pub fn peer(&self) -> Option<&tl::enums::Peer> {
733 match &self.raw {
734 tl::enums::Dialog::Dialog(d) => Some(&d.peer),
735 tl::enums::Dialog::Folder(_) => None,
736 }
737 }
738
739 pub fn unread_count(&self) -> i32 {
741 match &self.raw {
742 tl::enums::Dialog::Dialog(d) => d.unread_count,
743 _ => 0,
744 }
745 }
746
747 pub fn top_message(&self) -> i32 {
749 match &self.raw {
750 tl::enums::Dialog::Dialog(d) => d.top_message,
751 _ => 0,
752 }
753 }
754}
755
756struct ClientInner {
759 writer: Mutex<ConnectionWriter>,
762 write_half: Mutex<OwnedWriteHalf>,
766 #[allow(clippy::type_complexity)]
770 pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>>,
771 reconnect_tx: mpsc::UnboundedSender<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
774 network_hint_tx: mpsc::UnboundedSender<()>,
777 #[allow(dead_code)]
779 shutdown_token: CancellationToken,
780 #[allow(dead_code)]
782 catch_up: bool,
783 restart_policy: Arc<dyn ConnectionRestartPolicy>,
784 home_dc_id: Mutex<i32>,
785 dc_options: Mutex<HashMap<i32, DcEntry>>,
786 media_dc_options: Mutex<HashMap<i32, DcEntry>>,
788 pub peer_cache: RwLock<PeerCache>,
789 pub pts_state: Mutex<pts::PtsState>,
790 pub possible_gap: Mutex<pts::PossibleGapBuffer>,
792 api_id: i32,
793 api_hash: String,
794 retry_policy: Arc<dyn RetryPolicy>,
795 socks5: Option<crate::socks5::Socks5Config>,
796 mtproxy: Option<crate::proxy::MtProxyConfig>,
797 allow_ipv6: bool,
798 transport: TransportKind,
799 session_backend: Arc<dyn crate::session_backend::SessionBackend>,
800 dc_pool: Mutex<dc_pool::DcPool>,
801 update_tx: mpsc::Sender<update::Update>,
802 pub is_bot: std::sync::atomic::AtomicBool,
806 stream_active: std::sync::atomic::AtomicBool,
808 salt_request_in_flight: std::sync::atomic::AtomicBool,
812 dh_in_progress: std::sync::atomic::AtomicBool,
816}
817
818#[derive(Clone)]
820pub struct Client {
821 pub(crate) inner: Arc<ClientInner>,
822 _update_rx: Arc<Mutex<mpsc::Receiver<update::Update>>>,
823}
824
825impl Client {
826 pub fn builder() -> crate::builder::ClientBuilder {
841 crate::builder::ClientBuilder::default()
842 }
843
844 pub async fn connect(config: Config) -> Result<(Self, ShutdownToken), InvocationError> {
847 if config.api_id == 0 {
849 return Err(InvocationError::Deserialize(
850 "api_id must be non-zero".into(),
851 ));
852 }
853 if config.api_hash.is_empty() {
854 return Err(InvocationError::Deserialize(
855 "api_hash must not be empty".into(),
856 ));
857 }
858
859 let (update_tx, update_rx) = mpsc::channel(2048);
862
863 let socks5 = config.socks5.clone();
865 let mtproxy = config.mtproxy.clone();
866 let transport = config.transport.clone();
867
868 let (conn, home_dc_id, dc_opts, media_dc_opts, loaded_session) =
869 match config.session_backend.load().map_err(InvocationError::Io)? {
870 Some(s) => {
871 if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
872 if let Some(key) = dc.auth_key {
873 tracing::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
874 match Connection::connect_with_key(
875 &dc.addr,
876 key,
877 dc.first_salt,
878 dc.time_offset,
879 socks5.as_ref(),
880 &transport,
881 s.home_dc_id as i16,
882 )
883 .await
884 {
885 Ok(c) => {
886 let mut opts = session::default_dc_addresses()
887 .into_iter()
888 .map(|(id, addr)| {
889 (
890 id,
891 DcEntry {
892 dc_id: id,
893 addr,
894 auth_key: None,
895 first_salt: 0,
896 time_offset: 0,
897 flags: DcFlags::NONE,
898 },
899 )
900 })
901 .collect::<HashMap<_, _>>();
902 let mut media_opts: HashMap<i32, DcEntry> = HashMap::new();
903 for d in &s.dcs {
904 if d.flags.contains(DcFlags::MEDIA_ONLY)
905 || d.flags.contains(DcFlags::CDN)
906 {
907 media_opts.insert(d.dc_id, d.clone());
908 } else {
909 opts.insert(d.dc_id, d.clone());
910 }
911 }
912 (c, s.home_dc_id, opts, media_opts, Some(s))
913 }
914 Err(e) => {
915 tracing::warn!(
921 "[layer] Session connect failed ({e}): \
922 returning error (delete session file to reset)"
923 );
924 return Err(e);
925 }
926 }
927 } else {
928 let (c, dc, opts) =
929 Self::fresh_connect(socks5.as_ref(), mtproxy.as_ref(), &transport)
930 .await?;
931 (c, dc, opts, HashMap::new(), None)
932 }
933 } else {
934 let (c, dc, opts) =
935 Self::fresh_connect(socks5.as_ref(), mtproxy.as_ref(), &transport)
936 .await?;
937 (c, dc, opts, HashMap::new(), None)
938 }
939 }
940 None => {
941 let (c, dc, opts) =
942 Self::fresh_connect(socks5.as_ref(), mtproxy.as_ref(), &transport).await?;
943 (c, dc, opts, HashMap::new(), None)
944 }
945 };
946
947 let pool = dc_pool::DcPool::new(home_dc_id, &dc_opts.values().cloned().collect::<Vec<_>>());
949
950 let (writer, write_half, read_half, frame_kind) = conn.into_writer();
955 let auth_key = writer.enc.auth_key_bytes();
956 let session_id = writer.enc.session_id();
957
958 #[allow(clippy::type_complexity)]
959 let pending: Arc<
960 Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>,
961 > = Arc::new(Mutex::new(HashMap::new()));
962
963 let (reconnect_tx, reconnect_rx) =
965 mpsc::unbounded_channel::<(OwnedReadHalf, FrameKind, [u8; 256], i64)>();
966
967 let (network_hint_tx, network_hint_rx) = mpsc::unbounded_channel::<()>();
970
971 let shutdown_token = CancellationToken::new();
973 let catch_up = config.catch_up;
974 let restart_policy = config.restart_policy;
975
976 let inner = Arc::new(ClientInner {
977 writer: Mutex::new(writer),
978 write_half: Mutex::new(write_half),
979 pending: pending.clone(),
980 reconnect_tx,
981 network_hint_tx,
982 shutdown_token: shutdown_token.clone(),
983 catch_up,
984 restart_policy,
985 home_dc_id: Mutex::new(home_dc_id),
986 dc_options: Mutex::new(dc_opts),
987 media_dc_options: Mutex::new(media_dc_opts),
988 peer_cache: RwLock::new(PeerCache::default()),
989 pts_state: Mutex::new(pts::PtsState::default()),
990 possible_gap: Mutex::new(pts::PossibleGapBuffer::new()),
991 api_id: config.api_id,
992 api_hash: config.api_hash,
993 retry_policy: config.retry_policy,
994 socks5: config.socks5,
995 mtproxy: config.mtproxy,
996 allow_ipv6: config.allow_ipv6,
997 transport: config.transport,
998 session_backend: config.session_backend,
999 dc_pool: Mutex::new(pool),
1000 update_tx,
1001 is_bot: std::sync::atomic::AtomicBool::new(false),
1002 stream_active: std::sync::atomic::AtomicBool::new(false),
1003 salt_request_in_flight: std::sync::atomic::AtomicBool::new(false),
1004 dh_in_progress: std::sync::atomic::AtomicBool::new(false),
1005 });
1006
1007 let client = Self {
1008 inner,
1009 _update_rx: Arc::new(Mutex::new(update_rx)),
1010 };
1011
1012 {
1015 let client_r = client.clone();
1016 let shutdown_r = shutdown_token.clone();
1017 tokio::spawn(async move {
1018 client_r
1019 .run_reader_task(
1020 read_half,
1021 frame_kind,
1022 auth_key,
1023 session_id,
1024 reconnect_rx,
1025 network_hint_rx,
1026 shutdown_r,
1027 )
1028 .await;
1029 });
1030 }
1031
1032 if let Err(e) = client.init_connection().await {
1037 let key_is_stale = match &e {
1038 InvocationError::Rpc(r) if r.code == -404 => true,
1039 InvocationError::Rpc(r) if r.code == -429 => false,
1045 InvocationError::Io(io)
1046 if io.kind() == std::io::ErrorKind::UnexpectedEof
1047 || io.kind() == std::io::ErrorKind::ConnectionReset =>
1048 {
1049 true
1050 }
1051 _ => false,
1052 };
1053
1054 let dh_allowed = key_is_stale
1058 && client
1059 .inner
1060 .dh_in_progress
1061 .compare_exchange(
1062 false,
1063 true,
1064 std::sync::atomic::Ordering::SeqCst,
1065 std::sync::atomic::Ordering::SeqCst,
1066 )
1067 .is_ok();
1068
1069 if dh_allowed {
1070 tracing::warn!("[layer] init_connection: definitive bad-key ({e}), fresh DH …");
1071 {
1072 let home_dc_id = *client.inner.home_dc_id.lock().await;
1073 let mut opts = client.inner.dc_options.lock().await;
1074 if let Some(entry) = opts.get_mut(&home_dc_id)
1075 && entry.auth_key.is_some()
1076 {
1077 tracing::warn!("[layer] Clearing stale auth key for DC{home_dc_id}");
1078 entry.auth_key = None;
1079 entry.first_salt = 0;
1080 entry.time_offset = 0;
1081 }
1082 }
1083 client.save_session().await.ok();
1084 client.inner.pending.lock().await.clear();
1085
1086 let socks5_r = client.inner.socks5.clone();
1087 let mtproxy_r = client.inner.mtproxy.clone();
1088 let transport_r = client.inner.transport.clone();
1089
1090 let home_dc_id_r = *client.inner.home_dc_id.lock().await;
1094 let addr_r = {
1095 let opts = client.inner.dc_options.lock().await;
1096 opts.get(&home_dc_id_r)
1097 .map(|e| e.addr.clone())
1098 .unwrap_or_else(|| {
1099 crate::dc_migration::fallback_dc_addr(home_dc_id_r).to_string()
1100 })
1101 };
1102 let new_conn = Connection::connect_raw(
1103 &addr_r,
1104 socks5_r.as_ref(),
1105 mtproxy_r.as_ref(),
1106 &transport_r,
1107 home_dc_id_r as i16,
1108 )
1109 .await?;
1110
1111 let (new_writer, new_wh, new_read, new_fk) = new_conn.into_writer();
1113 {
1115 let mut opts_guard = client.inner.dc_options.lock().await;
1116 if let Some(entry) = opts_guard.get_mut(&home_dc_id_r) {
1117 entry.auth_key = Some(new_writer.auth_key_bytes());
1118 entry.first_salt = new_writer.first_salt();
1119 entry.time_offset = new_writer.time_offset();
1120 }
1121 }
1122 let new_ak = new_writer.enc.auth_key_bytes();
1124 let new_sid = new_writer.enc.session_id();
1125 *client.inner.writer.lock().await = new_writer;
1126 *client.inner.write_half.lock().await = new_wh;
1127 let _ = client
1128 .inner
1129 .reconnect_tx
1130 .send((new_read, new_fk, new_ak, new_sid));
1131 tokio::task::yield_now().await;
1132
1133 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1137
1138 client.init_connection().await?;
1139 client
1140 .inner
1141 .dh_in_progress
1142 .store(false, std::sync::atomic::Ordering::SeqCst);
1143 client.save_session().await.ok();
1145
1146 tracing::warn!(
1147 "[layer] Session invalidated and reset. \
1148 Call is_authorized() and re-authenticate if needed."
1149 );
1150 } else {
1151 return Err(e);
1152 }
1153 }
1154
1155 if let Some(ref s) = loaded_session
1157 && !s.peers.is_empty()
1158 {
1159 let mut cache = client.inner.peer_cache.write().await;
1160 for p in &s.peers {
1161 if p.is_channel {
1162 cache.channels.entry(p.id).or_insert(p.access_hash);
1163 } else {
1164 cache.users.entry(p.id).or_insert(p.access_hash);
1165 }
1166 }
1167 tracing::debug!(
1168 "[layer] Peer cache restored: {} users, {} channels",
1169 cache.users.len(),
1170 cache.channels.len()
1171 );
1172 }
1173
1174 let has_saved_state = loaded_session
1184 .as_ref()
1185 .is_some_and(|s| s.updates_state.is_initialised());
1186
1187 if catch_up && has_saved_state {
1188 let snap = &loaded_session.as_ref().unwrap().updates_state;
1189 let mut state = client.inner.pts_state.lock().await;
1190 state.pts = snap.pts;
1191 state.qts = snap.qts;
1192 state.date = snap.date;
1193 state.seq = snap.seq;
1194 for &(cid, cpts) in &snap.channels {
1195 state.channel_pts.insert(cid, cpts);
1196 }
1197 tracing::info!(
1198 "[layer] Update state restored: pts={}, qts={}, seq={}, {} channels",
1199 state.pts,
1200 state.qts,
1201 state.seq,
1202 state.channel_pts.len()
1203 );
1204 drop(state);
1205
1206 let channel_ids: Vec<i64> = snap.channels.iter().map(|&(cid, _)| cid).collect();
1210
1211 let c = client.clone();
1214 let utx = client.inner.update_tx.clone();
1215 tokio::spawn(async move {
1216 match c.get_difference().await {
1218 Ok(missed) => {
1219 tracing::info!(
1220 "[layer] catch_up: {} global updates replayed",
1221 missed.len()
1222 );
1223 for u in missed {
1224 if utx.try_send(attach_client_to_update(u, &c)).is_err() {
1225 tracing::warn!(
1226 "[layer] update channel full: dropping catch-up update"
1227 );
1228 break;
1229 }
1230 }
1231 }
1232 Err(e) => tracing::warn!("[layer] catch_up getDifference: {e}"),
1233 }
1234
1235 if !channel_ids.is_empty() {
1240 tracing::info!(
1241 "[layer] catch_up: per-channel diff for {} channels",
1242 channel_ids.len()
1243 );
1244 let sem = std::sync::Arc::new(tokio::sync::Semaphore::new(10));
1245 for channel_id in channel_ids {
1246 let c2 = c.clone();
1247 let utx2 = utx.clone();
1248 let permit = sem.clone().acquire_owned().await.unwrap();
1249 tokio::spawn(async move {
1250 let _permit = permit; match c2.get_channel_difference(channel_id).await {
1252 Ok(updates) => {
1253 if !updates.is_empty() {
1254 tracing::debug!(
1255 "[layer] catch_up channel {channel_id}: {} updates",
1256 updates.len()
1257 );
1258 }
1259 for u in updates {
1260 if utx2.try_send(u).is_err() {
1261 tracing::warn!(
1262 "[layer] update channel full: dropping channel diff update"
1263 );
1264 break;
1265 }
1266 }
1267 }
1268 Err(e) => {
1269 tracing::warn!("[layer] catch_up channel {channel_id}: {e}")
1270 }
1271 }
1272 });
1273 }
1274 }
1275 });
1276 } else {
1277 let _ = client.sync_pts_state().await;
1279 }
1280
1281 Ok((client, shutdown_token))
1282 }
1283
1284 async fn fresh_connect(
1285 socks5: Option<&crate::socks5::Socks5Config>,
1286 mtproxy: Option<&crate::proxy::MtProxyConfig>,
1287 transport: &TransportKind,
1288 ) -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
1289 tracing::debug!("[layer] Fresh connect to DC2 …");
1290 let conn = Connection::connect_raw(
1291 crate::dc_migration::fallback_dc_addr(2),
1292 socks5,
1293 mtproxy,
1294 transport,
1295 2i16,
1296 )
1297 .await?;
1298 let opts = session::default_dc_addresses()
1299 .into_iter()
1300 .map(|(id, addr)| {
1301 (
1302 id,
1303 DcEntry {
1304 dc_id: id,
1305 addr,
1306 auth_key: None,
1307 first_salt: 0,
1308 time_offset: 0,
1309 flags: DcFlags::NONE,
1310 },
1311 )
1312 })
1313 .collect();
1314 Ok((conn, 2, opts))
1315 }
1316
1317 async fn build_persisted_session(&self) -> PersistedSession {
1325 use session::{CachedPeer, UpdatesStateSnap};
1326
1327 let writer_guard = self.inner.writer.lock().await;
1328 let home_dc_id = *self.inner.home_dc_id.lock().await;
1329 let dc_options = self.inner.dc_options.lock().await;
1330
1331 let mut dcs: Vec<DcEntry> = dc_options
1332 .values()
1333 .map(|e| DcEntry {
1334 dc_id: e.dc_id,
1335 addr: e.addr.clone(),
1336 auth_key: if e.dc_id == home_dc_id {
1337 Some(writer_guard.auth_key_bytes())
1338 } else {
1339 e.auth_key
1340 },
1341 first_salt: if e.dc_id == home_dc_id {
1342 writer_guard.first_salt()
1343 } else {
1344 e.first_salt
1345 },
1346 time_offset: if e.dc_id == home_dc_id {
1347 writer_guard.time_offset()
1348 } else {
1349 e.time_offset
1350 },
1351 flags: e.flags,
1352 })
1353 .collect();
1354 {
1356 let media_opts = self.inner.media_dc_options.lock().await;
1357 for e in media_opts.values() {
1358 dcs.push(e.clone());
1359 }
1360 }
1361 self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
1362
1363 let pts_snap = {
1364 let s = self.inner.pts_state.lock().await;
1365 UpdatesStateSnap {
1366 pts: s.pts,
1367 qts: s.qts,
1368 date: s.date,
1369 seq: s.seq,
1370 channels: s.channel_pts.iter().map(|(&k, &v)| (k, v)).collect(),
1371 }
1372 };
1373
1374 let peers: Vec<CachedPeer> = {
1375 let cache = self.inner.peer_cache.read().await;
1376 let mut v = Vec::with_capacity(cache.users.len() + cache.channels.len());
1377 for (&id, &hash) in &cache.users {
1378 v.push(CachedPeer {
1379 id,
1380 access_hash: hash,
1381 is_channel: false,
1382 });
1383 }
1384 for (&id, &hash) in &cache.channels {
1385 v.push(CachedPeer {
1386 id,
1387 access_hash: hash,
1388 is_channel: true,
1389 });
1390 }
1391 v
1392 };
1393
1394 PersistedSession {
1395 home_dc_id,
1396 dcs,
1397 updates_state: pts_snap,
1398 peers,
1399 }
1400 }
1401
1402 pub async fn save_session(&self) -> Result<(), InvocationError> {
1404 let session = self.build_persisted_session().await;
1405 self.inner
1406 .session_backend
1407 .save(&session)
1408 .map_err(InvocationError::Io)?;
1409 tracing::debug!("[layer] Session saved ✓");
1410 Ok(())
1411 }
1412
1413 pub async fn export_session_string(&self) -> Result<String, InvocationError> {
1420 Ok(self.build_persisted_session().await.to_string())
1421 }
1422
1423 pub async fn media_dc_addr(&self, dc_id: i32) -> Option<String> {
1429 self.inner
1430 .media_dc_options
1431 .lock()
1432 .await
1433 .get(&dc_id)
1434 .map(|e| e.addr.clone())
1435 }
1436
1437 pub async fn best_media_dc_addr(&self) -> Option<(i32, String)> {
1440 let home = *self.inner.home_dc_id.lock().await;
1441 let media = self.inner.media_dc_options.lock().await;
1442 media
1443 .get(&home)
1444 .map(|e| (home, e.addr.clone()))
1445 .or_else(|| media.iter().next().map(|(&id, e)| (id, e.addr.clone())))
1446 }
1447
1448 pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
1450 match self.invoke(&tl::functions::updates::GetState {}).await {
1451 Ok(_) => Ok(true),
1452 Err(e)
1453 if e.is("AUTH_KEY_UNREGISTERED")
1454 || matches!(&e, InvocationError::Rpc(r) if r.code == 401) =>
1455 {
1456 Ok(false)
1457 }
1458 Err(e) => Err(e),
1459 }
1460 }
1461
1462 pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
1464 let req = tl::functions::auth::ImportBotAuthorization {
1465 flags: 0,
1466 api_id: self.inner.api_id,
1467 api_hash: self.inner.api_hash.clone(),
1468 bot_auth_token: token.to_string(),
1469 };
1470
1471 let result = self.invoke(&req).await?;
1472
1473 let name = match result {
1474 tl::enums::auth::Authorization::Authorization(a) => {
1475 self.cache_user(&a.user).await;
1476 Self::extract_user_name(&a.user)
1477 }
1478 tl::enums::auth::Authorization::SignUpRequired(_) => {
1479 return Err(InvocationError::Deserialize(
1480 "unexpected SignUpRequired during bot sign-in".into(),
1481 ));
1482 }
1483 };
1484 tracing::info!("[layer] Bot signed in ✓ ({name})");
1485 self.inner
1486 .is_bot
1487 .store(true, std::sync::atomic::Ordering::Relaxed);
1488 Ok(name)
1489 }
1490
1491 pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
1493 use tl::enums::auth::SentCode;
1494
1495 let req = self.make_send_code_req(phone);
1496 let body = self.rpc_call_raw(&req).await?;
1497
1498 let mut cur = Cursor::from_slice(&body);
1499 let hash = match tl::enums::auth::SentCode::deserialize(&mut cur)? {
1500 SentCode::SentCode(s) => s.phone_code_hash,
1501 SentCode::Success(_) => {
1502 return Err(InvocationError::Deserialize("unexpected Success".into()));
1503 }
1504 SentCode::PaymentRequired(_) => {
1505 return Err(InvocationError::Deserialize(
1506 "payment required to send code".into(),
1507 ));
1508 }
1509 };
1510 tracing::info!("[layer] Login code sent");
1511 Ok(LoginToken {
1512 phone: phone.to_string(),
1513 phone_code_hash: hash,
1514 })
1515 }
1516
1517 pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
1519 let req = tl::functions::auth::SignIn {
1520 phone_number: token.phone.clone(),
1521 phone_code_hash: token.phone_code_hash.clone(),
1522 phone_code: Some(code.trim().to_string()),
1523 email_verification: None,
1524 };
1525
1526 let body = match self.rpc_call_raw(&req).await {
1527 Ok(b) => b,
1528 Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
1529 let t = self.get_password_info().await.map_err(SignInError::Other)?;
1530 return Err(SignInError::PasswordRequired(Box::new(t)));
1531 }
1532 Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
1533 Err(e) => return Err(SignInError::Other(e)),
1534 };
1535
1536 let mut cur = Cursor::from_slice(&body);
1537 match tl::enums::auth::Authorization::deserialize(&mut cur)
1538 .map_err(|e| SignInError::Other(e.into()))?
1539 {
1540 tl::enums::auth::Authorization::Authorization(a) => {
1541 self.cache_user(&a.user).await;
1542 let name = Self::extract_user_name(&a.user);
1543 tracing::info!("[layer] Signed in ✓ Welcome, {name}!");
1544 Ok(name)
1545 }
1546 tl::enums::auth::Authorization::SignUpRequired(_) => Err(SignInError::SignUpRequired),
1547 }
1548 }
1549
1550 pub async fn check_password(
1552 &self,
1553 token: PasswordToken,
1554 password: impl AsRef<[u8]>,
1555 ) -> Result<String, InvocationError> {
1556 let pw = token.password;
1557 let algo = pw
1558 .current_algo
1559 .ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
1560 let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
1561 let g_b = pw
1562 .srp_b
1563 .ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
1564 let a = pw.secure_random;
1565 let srp_id = pw
1566 .srp_id
1567 .ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
1568
1569 let (m1, g_a) =
1570 two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
1571 let req = tl::functions::auth::CheckPassword {
1572 password: tl::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
1573 tl::types::InputCheckPasswordSrp {
1574 srp_id,
1575 a: g_a.to_vec(),
1576 m1: m1.to_vec(),
1577 },
1578 ),
1579 };
1580
1581 let body = self.rpc_call_raw(&req).await?;
1582 let mut cur = Cursor::from_slice(&body);
1583 match tl::enums::auth::Authorization::deserialize(&mut cur)? {
1584 tl::enums::auth::Authorization::Authorization(a) => {
1585 self.cache_user(&a.user).await;
1586 let name = Self::extract_user_name(&a.user);
1587 tracing::info!("[layer] 2FA ✓ Welcome, {name}!");
1588 Ok(name)
1589 }
1590 tl::enums::auth::Authorization::SignUpRequired(_) => Err(InvocationError::Deserialize(
1591 "unexpected SignUpRequired after 2FA".into(),
1592 )),
1593 }
1594 }
1595
1596 pub async fn sign_out(&self) -> Result<bool, InvocationError> {
1598 let req = tl::functions::auth::LogOut {};
1599 match self.rpc_call_raw(&req).await {
1600 Ok(_) => {
1601 tracing::info!("[layer] Signed out ✓");
1602 Ok(true)
1603 }
1604 Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
1605 Err(e) => Err(e),
1606 }
1607 }
1608
1609 pub async fn get_users_by_id(
1617 &self,
1618 ids: &[i64],
1619 ) -> Result<Vec<Option<crate::types::User>>, InvocationError> {
1620 let cache = self.inner.peer_cache.read().await;
1621 let input_ids: Vec<tl::enums::InputUser> = ids
1622 .iter()
1623 .map(|&id| {
1624 if id == 0 {
1625 tl::enums::InputUser::UserSelf
1626 } else {
1627 let hash = cache.users.get(&id).copied().unwrap_or(0);
1628 tl::enums::InputUser::InputUser(tl::types::InputUser {
1629 user_id: id,
1630 access_hash: hash,
1631 })
1632 }
1633 })
1634 .collect();
1635 drop(cache);
1636 let req = tl::functions::users::GetUsers { id: input_ids };
1637 let body = self.rpc_call_raw(&req).await?;
1638 let mut cur = Cursor::from_slice(&body);
1639 let users = Vec::<tl::enums::User>::deserialize(&mut cur)?;
1640 self.cache_users_slice(&users).await;
1641 Ok(users
1642 .into_iter()
1643 .map(crate::types::User::from_raw)
1644 .collect())
1645 }
1646
1647 pub async fn get_me(&self) -> Result<tl::types::User, InvocationError> {
1649 let req = tl::functions::users::GetUsers {
1650 id: vec![tl::enums::InputUser::UserSelf],
1651 };
1652 let body = self.rpc_call_raw(&req).await?;
1653 let mut cur = Cursor::from_slice(&body);
1654 let users = Vec::<tl::enums::User>::deserialize(&mut cur)?;
1655 self.cache_users_slice(&users).await;
1656 users
1657 .into_iter()
1658 .find_map(|u| match u {
1659 tl::enums::User::User(u) => Some(u),
1660 _ => None,
1661 })
1662 .ok_or_else(|| InvocationError::Deserialize("getUsers returned no user".into()))
1663 }
1664
1665 pub fn stream_updates(&self) -> UpdateStream {
1673 if self
1677 .inner
1678 .stream_active
1679 .swap(true, std::sync::atomic::Ordering::SeqCst)
1680 {
1681 panic!(
1682 "stream_updates() called twice on the same Client: only one UpdateStream is supported per client"
1683 );
1684 }
1685 let (caller_tx, rx) = mpsc::unbounded_channel::<update::Update>();
1686 let internal_rx = self._update_rx.clone();
1687 tokio::spawn(async move {
1688 let mut guard = internal_rx.lock().await;
1689 while let Some(upd) = guard.recv().await {
1690 if caller_tx.send(upd).is_err() {
1691 break;
1692 }
1693 }
1694 });
1695 UpdateStream { rx }
1696 }
1697
1698 pub fn signal_network_restored(&self) {
1711 let _ = self.inner.network_hint_tx.send(());
1712 }
1713
1714 #[allow(clippy::too_many_arguments)]
1748 async fn run_reader_task(
1749 &self,
1750 read_half: OwnedReadHalf,
1751 frame_kind: FrameKind,
1752 auth_key: [u8; 256],
1753 session_id: i64,
1754 mut new_conn_rx: mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1755 mut network_hint_rx: mpsc::UnboundedReceiver<()>,
1756 shutdown_token: CancellationToken,
1757 ) {
1758 let mut rh = read_half;
1759 let mut fk = frame_kind;
1760 let mut ak = auth_key;
1761 let mut sid = session_id;
1762 let mut restart_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = None;
1765 let mut restart_count: u32 = 0;
1766
1767 loop {
1768 tokio::select! {
1769 _ = shutdown_token.cancelled() => {
1771 tracing::info!("[layer] Reader task: shutdown requested, exiting cleanly.");
1772 let mut pending = self.inner.pending.lock().await;
1773 for (_, tx) in pending.drain() {
1774 let _ = tx.send(Err(InvocationError::Dropped));
1775 }
1776 return;
1777 }
1778
1779 _ = self.reader_loop(
1781 rh, fk, ak, sid,
1782 restart_init_rx.take(),
1783 &mut new_conn_rx, &mut network_hint_rx,
1784 ) => {}
1785 }
1786
1787 if shutdown_token.is_cancelled() {
1790 tracing::debug!("[layer] Reader task: exiting after loop (shutdown).");
1791 return;
1792 }
1793
1794 restart_count += 1;
1795 tracing::error!(
1796 "[layer] Reader loop exited unexpectedly (restart #{restart_count}): supervisor reconnecting …"
1797 );
1798
1799 {
1801 let mut pending = self.inner.pending.lock().await;
1802 for (_, tx) in pending.drain() {
1803 let _ = tx.send(Err(InvocationError::Io(std::io::Error::new(
1804 std::io::ErrorKind::ConnectionReset,
1805 "reader task restarted",
1806 ))));
1807 }
1808 }
1809 self.inner.writer.lock().await.sent_bodies.clear();
1811
1812 let mut delay_ms = RECONNECT_BASE_MS;
1814 let new_conn = loop {
1815 tracing::debug!("[layer] Supervisor: reconnecting in {delay_ms} ms …");
1816 tokio::select! {
1817 _ = shutdown_token.cancelled() => {
1818 tracing::debug!("[layer] Supervisor: shutdown during reconnect, exiting.");
1819 return;
1820 }
1821 _ = sleep(Duration::from_millis(delay_ms)) => {}
1822 }
1823
1824 let dummy_ak = [0u8; 256];
1829 let dummy_fk = FrameKind::Abridged;
1830 match self.do_reconnect(&dummy_ak, &dummy_fk).await {
1831 Ok(conn) => break conn,
1832 Err(e) => {
1833 tracing::warn!("[layer] Supervisor: reconnect failed ({e})");
1834 let next = (delay_ms * 2).min(RECONNECT_MAX_SECS * 1_000);
1835 delay_ms = jitter_delay(next).as_millis() as u64;
1836 }
1837 }
1838 };
1839
1840 let (new_rh, new_fk, new_ak, new_sid) = new_conn;
1841 rh = new_rh;
1842 fk = new_fk;
1843 ak = new_ak;
1844 sid = new_sid;
1845
1846 let (init_tx, init_rx) = oneshot::channel();
1849 let c = self.clone();
1850 let utx = self.inner.update_tx.clone();
1851 tokio::spawn(async move {
1852 let result = loop {
1854 match c.init_connection().await {
1855 Ok(()) => break Ok(()),
1856 Err(InvocationError::Rpc(ref r)) if r.flood_wait_seconds().is_some() => {
1857 let secs = r.flood_wait_seconds().unwrap();
1858 tracing::warn!(
1859 "[layer] Supervisor init_connection FLOOD_WAIT_{secs}: waiting"
1860 );
1861 sleep(Duration::from_secs(secs + 1)).await;
1862 }
1863 Err(e) => break Err(e),
1864 }
1865 };
1866 if result.is_ok() {
1867 if c.inner
1869 .dh_in_progress
1870 .load(std::sync::atomic::Ordering::SeqCst)
1871 {
1872 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1873 }
1874 let missed = match c.get_difference().await {
1875 Ok(updates) => updates,
1876 Err(ref e)
1877 if matches!(e,
1878 InvocationError::Rpc(r) if r.code == 401) =>
1879 {
1880 tracing::warn!(
1881 "[layer] getDifference AUTH_KEY_UNREGISTERED after \
1882 fresh DH: falling back to sync_pts_state"
1883 );
1884 let _ = c.sync_pts_state().await;
1885 vec![]
1886 }
1887 Err(e) => {
1888 tracing::warn!("[layer] getDifference failed after reconnect: {e}");
1889 vec![]
1890 }
1891 };
1892 for u in missed {
1893 if utx.try_send(u).is_err() {
1894 tracing::warn!("[layer] update channel full: dropping catch-up update");
1895 break;
1896 }
1897 }
1898 }
1899 let _ = init_tx.send(result);
1900 });
1901 restart_init_rx = Some(init_rx);
1902
1903 tracing::debug!(
1904 "[layer] Supervisor: restarting reader loop (restart #{restart_count}) …"
1905 );
1906 }
1908 }
1909
1910 #[allow(clippy::too_many_arguments)]
1911 async fn reader_loop(
1912 &self,
1913 mut rh: OwnedReadHalf,
1914 mut fk: FrameKind,
1915 mut ak: [u8; 256],
1916 mut sid: i64,
1917 initial_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>>,
1920 new_conn_rx: &mut mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1921 network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
1922 ) {
1923 let mut init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = initial_init_rx;
1928 let mut init_fail_count: u32 = 0;
1933
1934 let mut gap_tick = tokio::time::interval(std::time::Duration::from_millis(1500));
1935 gap_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1936
1937 let mut restart_interval = self.inner.restart_policy.restart_interval().map(|d| {
1938 let mut i = tokio::time::interval(d);
1939 i.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1940 i
1941 });
1942 if let Some(ref mut i) = restart_interval {
1943 i.tick().await;
1944 }
1945
1946 loop {
1947 tokio::select! {
1948 _ = gap_tick.tick() => {
1952 if self.inner.possible_gap.lock().await.has_global() {
1957 let gap_expired = self.inner.possible_gap.lock().await.global_deadline_elapsed();
1958 if gap_expired {
1959 let c = self.clone();
1960 tokio::spawn(async move {
1961 if let Err(e) = c.check_update_deadline().await {
1962 tracing::warn!("[layer] gap tick getDifference: {e}");
1963 }
1964 });
1965 }
1966 }
1967 }
1968 _ = async {
1969 if let Some(ref mut i) = restart_interval { i.tick().await; }
1970 else { std::future::pending::<()>().await; }
1971 } => {
1972 tracing::info!("[layer] scheduled restart: reconnecting");
1973 let _ = self.inner.write_half.lock().await.shutdown().await;
1974 let _ = self.inner.network_hint_tx.send(());
1975 }
1976 outcome = recv_frame_with_keepalive(&mut rh, &fk, self, &ak) => {
1978 match outcome {
1979 FrameOutcome::Frame(mut raw) => {
1980 let msg = match EncryptedSession::decrypt_frame(&ak, sid, &mut raw) {
1981 Ok(m) => m,
1982 Err(e) => {
1983 tracing::warn!("[layer] Decrypt error: {e:?}: failing pending waiters and reconnecting");
1989 drop(init_rx.take());
1990 {
1991 let mut pending = self.inner.pending.lock().await;
1992 let msg = format!("decrypt error: {e}");
1993 for (_, tx) in pending.drain() {
1994 let _ = tx.send(Err(InvocationError::Io(
1995 std::io::Error::new(
1996 std::io::ErrorKind::InvalidData,
1997 msg.clone(),
1998 )
1999 )));
2000 }
2001 }
2002 self.inner.writer.lock().await.sent_bodies.clear();
2003 match self.do_reconnect_loop(
2004 RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
2005 network_hint_rx,
2006 ).await {
2007 Some(rx) => { init_rx = Some(rx); }
2008 None => return,
2009 }
2010 continue;
2011 }
2012 };
2013 self.route_frame(msg.body, msg.msg_id).await;
2018
2019 }
2024
2025 FrameOutcome::Error(e) => {
2026 tracing::warn!("[layer] Reader: connection error: {e}");
2027 drop(init_rx.take()); let key_is_stale = match &e {
2040 InvocationError::Rpc(r) if r.code == -404 => true,
2041 InvocationError::Rpc(r) if r.code == -429 => false,
2042 InvocationError::Io(io)
2043 if io.kind() == std::io::ErrorKind::UnexpectedEof
2044 || io.kind() == std::io::ErrorKind::ConnectionReset => true,
2045 _ => false,
2046 };
2047 let clear_key = key_is_stale
2051 && self.inner.dh_in_progress
2052 .compare_exchange(false, true,
2053 std::sync::atomic::Ordering::SeqCst,
2054 std::sync::atomic::Ordering::SeqCst)
2055 .is_ok();
2056 if clear_key {
2057 let home_dc_id = *self.inner.home_dc_id.lock().await;
2058 let mut opts = self.inner.dc_options.lock().await;
2059 if let Some(entry) = opts.get_mut(&home_dc_id) {
2060 tracing::warn!(
2061 "[layer] Stale auth key on DC{home_dc_id} ({e}) \
2062 : clearing for fresh DH"
2063 );
2064 entry.auth_key = None;
2065 }
2066 }
2067
2068 {
2071 let mut pending = self.inner.pending.lock().await;
2072 let msg = e.to_string();
2073 for (_, tx) in pending.drain() {
2074 let _ = tx.send(Err(InvocationError::Io(
2075 std::io::Error::new(
2076 std::io::ErrorKind::ConnectionReset, msg.clone()))));
2077 }
2078 }
2079 self.inner.writer.lock().await.sent_bodies.clear();
2081
2082 let reconnect_delay = if clear_key { 0 } else { RECONNECT_BASE_MS };
2085 match self.do_reconnect_loop(
2086 reconnect_delay, &mut rh, &mut fk, &mut ak, &mut sid,
2087 network_hint_rx,
2088 ).await {
2089 Some(rx) => {
2090 self.inner.dh_in_progress
2093 .store(false, std::sync::atomic::Ordering::SeqCst);
2094 init_rx = Some(rx);
2095 }
2096 None => {
2097 self.inner.dh_in_progress
2098 .store(false, std::sync::atomic::Ordering::SeqCst);
2099 return; }
2101 }
2102 }
2103
2104 FrameOutcome::Keepalive => {
2105 let c = self.clone();
2109 tokio::spawn(async move {
2110 if let Err(e) = c.check_update_deadline().await {
2111 tracing::warn!("[layer] check_update_deadline: {e}");
2112 }
2113 });
2114 }
2115 }
2116 }
2117
2118 maybe = new_conn_rx.recv() => {
2120 if let Some((new_rh, new_fk, new_ak, new_sid)) = maybe {
2121 rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
2122 tracing::debug!("[layer] Reader: switched to new connection.");
2123 } else {
2124 break; }
2126 }
2127
2128
2129 init_result = async { init_rx.as_mut().unwrap().await }, if init_rx.is_some() => {
2131 init_rx = None;
2132 match init_result {
2133 Ok(Ok(())) => {
2134 init_fail_count = 0;
2135 tracing::info!("[layer] Reconnected to Telegram ✓: session live, replaying missed updates …");
2144 }
2145
2146 Ok(Err(e)) => {
2147 let key_is_stale = match &e {
2151 InvocationError::Rpc(r) if r.code == -404 => true,
2152 InvocationError::Rpc(r) if r.code == -429 => false,
2153 InvocationError::Io(io) if io.kind() == std::io::ErrorKind::UnexpectedEof
2154 || io.kind() == std::io::ErrorKind::ConnectionReset => true,
2155 _ => false,
2156 };
2157 let dh_claimed = key_is_stale
2159 && self.inner.dh_in_progress
2160 .compare_exchange(false, true,
2161 std::sync::atomic::Ordering::SeqCst,
2162 std::sync::atomic::Ordering::SeqCst)
2163 .is_ok();
2164
2165 if dh_claimed {
2166 tracing::warn!(
2167 "[layer] init_connection: definitive bad-key ({e}) \
2168 : clearing auth key for fresh DH …"
2169 );
2170 init_fail_count = 0;
2171 let home_dc_id = *self.inner.home_dc_id.lock().await;
2172 let mut opts = self.inner.dc_options.lock().await;
2173 if let Some(entry) = opts.get_mut(&home_dc_id) {
2174 entry.auth_key = None;
2175 }
2176 } else {
2178 init_fail_count += 1;
2179 tracing::warn!(
2180 "[layer] init_connection failed (attempt {init_fail_count}, {e}) \
2181 : retrying with same key …"
2182 );
2183 }
2184 {
2185 let mut pending = self.inner.pending.lock().await;
2186 let msg = e.to_string();
2187 for (_, tx) in pending.drain() {
2188 let _ = tx.send(Err(InvocationError::Io(
2189 std::io::Error::new(
2190 std::io::ErrorKind::ConnectionReset, msg.clone()))));
2191 }
2192 }
2193 match self.do_reconnect_loop(
2194 0, &mut rh, &mut fk, &mut ak, &mut sid, network_hint_rx,
2195 ).await {
2196 Some(rx) => { init_rx = Some(rx); }
2197 None => return,
2198 }
2199 }
2200
2201 Err(_) => {
2202 tracing::warn!("[layer] init_connection task dropped unexpectedly, reconnecting …");
2204 match self.do_reconnect_loop(
2205 RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
2206 network_hint_rx,
2207 ).await {
2208 Some(rx) => { init_rx = Some(rx); }
2209 None => return,
2210 }
2211 }
2212 }
2213 }
2214 }
2215 }
2216 }
2217
2218 async fn route_frame(&self, body: Vec<u8>, msg_id: i64) {
2220 if body.len() < 4 {
2221 return;
2222 }
2223 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
2224
2225 match cid {
2226 ID_RPC_RESULT => {
2227 if body.len() < 12 {
2228 return;
2229 }
2230 let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2231 let inner = body[12..].to_vec();
2232 self.inner.writer.lock().await.pending_ack.push(msg_id);
2234 let result = unwrap_envelope(inner);
2235 if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
2236 self.inner
2238 .writer
2239 .lock()
2240 .await
2241 .sent_bodies
2242 .remove(&req_msg_id);
2243 self.inner
2245 .writer
2246 .lock()
2247 .await
2248 .container_map
2249 .retain(|_, inner| *inner != req_msg_id);
2250 let to_send = match result {
2251 Ok(EnvelopeResult::Payload(p)) => Ok(p),
2252 Ok(EnvelopeResult::RawUpdates(bodies)) => {
2253 let c = self.clone();
2258 tokio::spawn(async move {
2259 for body in bodies {
2260 c.dispatch_updates(&body).await;
2261 }
2262 });
2263 Ok(vec![])
2264 }
2265 Ok(EnvelopeResult::Pts(pts, pts_count)) => {
2266 let c = self.clone();
2268 tokio::spawn(async move {
2269 match c.check_and_fill_gap(pts, pts_count, None).await {
2270 Ok(replayed) => {
2271 for u in replayed {
2273 let _ = c.inner.update_tx.try_send(u);
2274 }
2275 }
2276 Err(e) => tracing::warn!(
2277 "[layer] updateShortSentMessage pts advance: {e}"
2278 ),
2279 }
2280 });
2281 Ok(vec![])
2282 }
2283 Ok(EnvelopeResult::None) => Ok(vec![]),
2284 Err(e) => {
2285 tracing::debug!(
2286 "[layer] rpc_result deserialize failure for msg_id={req_msg_id}: {e}"
2287 );
2288 Err(e)
2289 }
2290 };
2291 let _ = tx.send(to_send);
2292 }
2293 }
2294 ID_RPC_ERROR => {
2295 tracing::warn!("[layer] Unexpected top-level rpc_error (no pending target)");
2296 }
2297 ID_MSG_CONTAINER => {
2298 if body.len() < 8 {
2299 return;
2300 }
2301 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
2302 let mut pos = 8usize;
2303 for _ in 0..count {
2304 if pos + 16 > body.len() {
2305 break;
2306 }
2307 let inner_msg_id = i64::from_le_bytes(body[pos..pos + 8].try_into().unwrap());
2309 let inner_len =
2310 u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
2311 pos += 16;
2312 if pos + inner_len > body.len() {
2313 break;
2314 }
2315 let inner = body[pos..pos + inner_len].to_vec();
2316 pos += inner_len;
2317 Box::pin(self.route_frame(inner, inner_msg_id)).await;
2318 }
2319 }
2320 ID_GZIP_PACKED => {
2321 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
2322 if let Ok(inflated) = gz_inflate(&bytes) {
2323 Box::pin(self.route_frame(inflated, msg_id)).await;
2325 }
2326 }
2327 ID_BAD_SERVER_SALT => {
2328 if body.len() >= 28 {
2335 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2336 let new_salt = i64::from_le_bytes(body[20..28].try_into().unwrap());
2337
2338 {
2341 let mut w = self.inner.writer.lock().await;
2342 w.salts.clear();
2343 w.salts.push(FutureSalt {
2344 valid_since: 0,
2345 valid_until: i32::MAX,
2346 salt: new_salt,
2347 });
2348 w.enc.salt = new_salt;
2349 }
2350 tracing::debug!(
2351 "[layer] bad_server_salt: bad_msg_id={bad_msg_id} new_salt={new_salt:#x}"
2352 );
2353
2354 {
2359 let mut w = self.inner.writer.lock().await;
2360
2361 let resolved_id = if w.sent_bodies.contains_key(&bad_msg_id) {
2363 bad_msg_id
2364 } else if let Some(&inner_id) = w.container_map.get(&bad_msg_id) {
2365 w.container_map.remove(&bad_msg_id);
2366 inner_id
2367 } else {
2368 bad_msg_id };
2370
2371 if let Some(orig_body) = w.sent_bodies.remove(&resolved_id) {
2372 let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
2373 let fk = w.frame_kind.clone();
2374 drop(w);
2377 let mut pending = self.inner.pending.lock().await;
2378 if let Some(tx) = pending.remove(&resolved_id) {
2379 pending.insert(new_msg_id, tx);
2380 drop(pending);
2381 if let Err(e) = send_frame_write(
2382 &mut *self.inner.write_half.lock().await,
2383 &wire,
2384 &fk,
2385 )
2386 .await
2387 {
2388 tracing::warn!("[layer] bad_server_salt re-send failed: {e}");
2389 } else {
2390 tracing::debug!(
2391 "[layer] bad_server_salt re-sent \
2392 {resolved_id}→{new_msg_id}"
2393 );
2394 }
2395 }
2396 } else {
2397 drop(w);
2400 if let Some(tx) = self.inner.pending.lock().await.remove(&bad_msg_id) {
2401 let _ = tx.send(Err(InvocationError::Io(std::io::Error::new(
2402 std::io::ErrorKind::InvalidData,
2403 "bad_server_salt on re-sent message; caller should retry",
2404 ))));
2405 }
2406 }
2407 }
2408
2409 self.spawn_salt_fetch_if_needed();
2411 }
2412 }
2413 ID_PONG => {
2414 if body.len() >= 20 {
2420 let ping_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2421 self.inner.writer.lock().await.pending_ack.push(msg_id);
2423 if let Some(tx) = self.inner.pending.lock().await.remove(&ping_msg_id) {
2424 let mut w = self.inner.writer.lock().await;
2425 w.sent_bodies.remove(&ping_msg_id);
2426 w.container_map.retain(|_, inner| *inner != ping_msg_id);
2427 drop(w);
2428 let _ = tx.send(Ok(body));
2429 }
2430 }
2431 }
2432 ID_FUTURE_SALTS => {
2434 self.inner.writer.lock().await.pending_ack.push(msg_id);
2448
2449 if body.len() >= 24 {
2450 let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2451 let server_now = i32::from_le_bytes(body[12..16].try_into().unwrap());
2452 let count = u32::from_le_bytes(body[20..24].try_into().unwrap()) as usize;
2453
2454 let mut new_salts: Vec<FutureSalt> =
2457 Vec::with_capacity(count.clamp(0, 4096) as usize);
2458 for i in 0..count {
2459 let base = 24 + i * 16;
2460 if base + 16 > body.len() {
2461 break;
2462 }
2463 new_salts.push(FutureSalt {
2469 valid_since: i32::from_le_bytes(
2470 body[base..base + 4].try_into().unwrap(),
2471 ),
2472 salt: i64::from_le_bytes(body[base + 4..base + 12].try_into().unwrap()),
2473 valid_until: i32::from_le_bytes(
2474 body[base + 12..base + 16].try_into().unwrap(),
2475 ),
2476 });
2477 }
2478
2479 if !new_salts.is_empty() {
2480 new_salts.sort_by_key(|s| s.valid_since);
2483 let mut w = self.inner.writer.lock().await;
2484 w.salts = new_salts;
2485 w.start_salt_time = Some((server_now, std::time::Instant::now()));
2486
2487 let use_salt = w
2492 .salts
2493 .iter()
2494 .rev()
2495 .find(|s| s.valid_since + SALT_USE_DELAY <= server_now)
2496 .or_else(|| w.salts.first())
2497 .map(|s| s.salt);
2498 if let Some(salt) = use_salt {
2499 w.enc.salt = salt;
2500 tracing::debug!(
2501 "[layer] FutureSalts: stored {} salts, \
2502 active salt={salt:#x}",
2503 w.salts.len()
2504 );
2505 }
2506 }
2507
2508 if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
2509 let mut w = self.inner.writer.lock().await;
2510 w.sent_bodies.remove(&req_msg_id);
2511 w.container_map.retain(|_, inner| *inner != req_msg_id);
2512 drop(w);
2513 let _ = tx.send(Ok(body));
2514 }
2515 }
2516 }
2517 ID_NEW_SESSION => {
2518 if body.len() >= 28 {
2523 let server_salt = i64::from_le_bytes(body[20..28].try_into().unwrap());
2524 let mut w = self.inner.writer.lock().await;
2525 w.pending_ack.push(msg_id);
2527 w.salts.clear();
2530 w.salts.push(FutureSalt {
2531 valid_since: 0,
2532 valid_until: i32::MAX,
2533 salt: server_salt,
2534 });
2535 w.enc.salt = server_salt;
2536 tracing::debug!(
2537 "[layer] new_session_created: salt pool reset to {server_salt:#x}"
2538 );
2539 }
2540 }
2541 ID_BAD_MSG_NOTIFY => {
2543 if body.len() < 20 {
2545 return;
2546 }
2547 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2548 let error_code = u32::from_le_bytes(body[16..20].try_into().unwrap());
2549
2550 let description = match error_code {
2552 16 => "msg_id too low",
2553 17 => "msg_id too high",
2554 18 => "incorrect two lower order msg_id bits (bug)",
2555 19 => "container msg_id is same as previously received (bug)",
2556 20 => "message too old",
2557 32 => "msg_seqno too low",
2558 33 => "msg_seqno too high",
2559 34 => "even msg_seqno expected (bug)",
2560 35 => "odd msg_seqno expected (bug)",
2561 48 => "incorrect server salt",
2562 64 => "invalid container (bug)",
2563 _ => "unknown bad_msg code",
2564 };
2565
2566 let retryable = matches!(error_code, 16 | 17 | 48);
2568 let fatal = !retryable && !matches!(error_code, 32 | 33);
2569
2570 if fatal {
2571 tracing::error!(
2572 "[layer] bad_msg_notification (fatal): bad_msg_id={bad_msg_id} \
2573 code={error_code}: {description}"
2574 );
2575 } else {
2576 tracing::warn!(
2577 "[layer] bad_msg_notification: bad_msg_id={bad_msg_id} \
2578 code={error_code}: {description}"
2579 );
2580 }
2581
2582 let resend: Option<(Vec<u8>, i64, i64, FrameKind)> = {
2586 let mut w = self.inner.writer.lock().await;
2587
2588 if error_code == 16 || error_code == 17 {
2590 w.enc.correct_time_offset(msg_id);
2591 }
2592 if error_code == 32 || error_code == 33 {
2594 w.enc.correct_seq_no(error_code);
2595 }
2596
2597 if retryable {
2598 let resolved_id = if w.sent_bodies.contains_key(&bad_msg_id) {
2602 bad_msg_id
2603 } else if let Some(&inner_id) = w.container_map.get(&bad_msg_id) {
2604 w.container_map.remove(&bad_msg_id);
2605 inner_id
2606 } else {
2607 bad_msg_id
2608 };
2609
2610 if let Some(orig_body) = w.sent_bodies.remove(&resolved_id) {
2611 let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
2612 let fk = w.frame_kind.clone();
2613 w.sent_bodies.insert(new_msg_id, orig_body);
2614 Some((wire, resolved_id, new_msg_id, fk))
2616 } else {
2617 None
2618 }
2619 } else {
2620 w.sent_bodies.remove(&bad_msg_id);
2622 if let Some(&inner_id) = w.container_map.get(&bad_msg_id) {
2623 w.sent_bodies.remove(&inner_id);
2624 w.container_map.remove(&bad_msg_id);
2625 }
2626 None
2627 }
2628 }; match resend {
2631 Some((wire, old_msg_id, new_msg_id, fk)) => {
2632 let has_waiter = {
2634 let mut pending = self.inner.pending.lock().await;
2635 if let Some(tx) = pending.remove(&old_msg_id) {
2636 pending.insert(new_msg_id, tx);
2637 true
2638 } else {
2639 false
2640 }
2641 };
2642 if has_waiter {
2643 if let Err(e) = send_frame_write(
2645 &mut *self.inner.write_half.lock().await,
2646 &wire,
2647 &fk,
2648 )
2649 .await
2650 {
2651 tracing::warn!("[layer] re-send failed: {e}");
2652 self.inner
2653 .writer
2654 .lock()
2655 .await
2656 .sent_bodies
2657 .remove(&new_msg_id);
2658 } else {
2659 tracing::debug!("[layer] re-sent {old_msg_id}→{new_msg_id}");
2660 }
2661 } else {
2662 self.inner
2663 .writer
2664 .lock()
2665 .await
2666 .sent_bodies
2667 .remove(&new_msg_id);
2668 }
2669 }
2670 None => {
2671 if let Some(tx) = self.inner.pending.lock().await.remove(&bad_msg_id) {
2673 let _ = tx.send(Err(InvocationError::Deserialize(format!(
2674 "bad_msg_notification code={error_code} ({description})"
2675 ))));
2676 }
2677 }
2678 }
2679 }
2680 ID_MSG_DETAILED_INFO => {
2682 if body.len() >= 20 {
2686 let answer_msg_id = i64::from_le_bytes(body[12..20].try_into().unwrap());
2687 self.inner
2688 .writer
2689 .lock()
2690 .await
2691 .pending_ack
2692 .push(answer_msg_id);
2693 tracing::trace!(
2694 "[layer] MsgDetailedInfo: queued ack for answer_msg_id={answer_msg_id}"
2695 );
2696 }
2697 }
2698 ID_MSG_NEW_DETAIL_INFO => {
2699 if body.len() >= 12 {
2702 let answer_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
2703 self.inner
2704 .writer
2705 .lock()
2706 .await
2707 .pending_ack
2708 .push(answer_msg_id);
2709 tracing::trace!("[layer] MsgNewDetailedInfo: queued ack for {answer_msg_id}");
2710 }
2711 }
2712 ID_MSG_RESEND_REQ => {
2714 if body.len() >= 12 {
2719 let count = u32::from_le_bytes(body[8..12].try_into().unwrap()) as usize;
2720 let mut resends: Vec<(Vec<u8>, i64, i64)> = Vec::new();
2721 {
2722 let mut w = self.inner.writer.lock().await;
2723 let fk = w.frame_kind.clone();
2724 for i in 0..count {
2725 let off = 12 + i * 8;
2726 if off + 8 > body.len() {
2727 break;
2728 }
2729 let resend_id =
2730 i64::from_le_bytes(body[off..off + 8].try_into().unwrap());
2731 if let Some(orig_body) = w.sent_bodies.remove(&resend_id) {
2732 let (wire, new_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
2733 let mut pending = self.inner.pending.lock().await;
2734 if let Some(tx) = pending.remove(&resend_id) {
2735 pending.insert(new_id, tx);
2736 }
2737 drop(pending);
2738 w.sent_bodies.insert(new_id, orig_body);
2739 resends.push((wire, resend_id, new_id));
2740 }
2741 }
2742 let _ = fk; }
2744 let fk = self.inner.writer.lock().await.frame_kind.clone();
2746 for (wire, resend_id, new_id) in resends {
2747 send_frame_write(&mut *self.inner.write_half.lock().await, &wire, &fk)
2748 .await
2749 .ok();
2750 tracing::debug!("[layer] MsgResendReq: resent {resend_id} → {new_id}");
2751 }
2752 }
2753 }
2754 0xe22045fc => {
2756 tracing::warn!("[layer] destroy_session_ok received: session terminated by server");
2757 }
2758 0x62d350c9 => {
2759 tracing::warn!("[layer] destroy_session_none received: session was already gone");
2760 }
2761 ID_UPDATES
2762 | ID_UPDATE_SHORT
2763 | ID_UPDATES_COMBINED
2764 | ID_UPDATE_SHORT_MSG
2765 | ID_UPDATE_SHORT_CHAT_MSG
2766 | ID_UPDATE_SHORT_SENT_MSG
2767 | ID_UPDATES_TOO_LONG => {
2768 self.inner.writer.lock().await.pending_ack.push(msg_id);
2770 self.dispatch_updates(&body).await;
2772 }
2773 _ => {}
2774 }
2775 }
2776
2777 fn update_sort_key(upd: &tl::enums::Update) -> i32 {
2787 use tl::enums::Update::*;
2788 match upd {
2789 NewMessage(u) => u.pts - u.pts_count,
2790 EditMessage(u) => u.pts - u.pts_count,
2791 DeleteMessages(u) => u.pts - u.pts_count,
2792 ReadHistoryInbox(u) => u.pts - u.pts_count,
2793 ReadHistoryOutbox(u) => u.pts - u.pts_count,
2794 NewChannelMessage(u) => u.pts - u.pts_count,
2795 EditChannelMessage(u) => u.pts - u.pts_count,
2796 DeleteChannelMessages(u) => u.pts - u.pts_count,
2797 _ => 0,
2798 }
2799 }
2800
2801 async fn dispatch_updates(&self, body: &[u8]) {
2806 if body.len() < 4 {
2807 return;
2808 }
2809 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
2810
2811 if cid == 0xe317af7e_u32 {
2813 tracing::warn!("[layer] updatesTooLong: getDifference");
2814 let c = self.clone();
2815 let utx = self.inner.update_tx.clone();
2816 tokio::spawn(async move {
2817 match c.get_difference().await {
2818 Ok(updates) => {
2819 for u in updates {
2820 if utx.try_send(u).is_err() {
2821 tracing::warn!("[layer] update channel full: dropping update");
2822 break;
2823 }
2824 }
2825 }
2826 Err(e) => tracing::warn!("[layer] getDifference after updatesTooLong: {e}"),
2827 }
2828 });
2829 return;
2830 }
2831
2832 if cid == 0x313bc7f8 {
2841 let mut cur = Cursor::from_slice(&body[4..]);
2843 let m = match tl::types::UpdateShortMessage::deserialize(&mut cur) {
2844 Ok(m) => m,
2845 Err(e) => {
2846 tracing::debug!("[layer] updateShortMessage deserialize error: {e}");
2847 return;
2848 }
2849 };
2850 let pts = m.pts;
2851 let pts_count = m.pts_count;
2852 let upd = update::Update::NewMessage(update::make_short_dm(m));
2853 let c = self.clone();
2854 let utx = self.inner.update_tx.clone();
2855 tokio::spawn(async move {
2856 match c
2857 .check_and_fill_gap(pts, pts_count, Some(attach_client_to_update(upd, &c)))
2858 .await
2859 {
2860 Ok(updates) => {
2861 for u in updates {
2862 if utx.try_send(u).is_err() {
2863 tracing::warn!("[layer] update channel full: dropping update");
2864 }
2865 }
2866 }
2867 Err(e) => tracing::warn!("[layer] updateShortMessage gap fill: {e}"),
2868 }
2869 });
2870 return;
2871 }
2872 if cid == 0x4d6deea5 {
2873 let mut cur = Cursor::from_slice(&body[4..]);
2875 let m = match tl::types::UpdateShortChatMessage::deserialize(&mut cur) {
2876 Ok(m) => m,
2877 Err(e) => {
2878 tracing::debug!("[layer] updateShortChatMessage deserialize error: {e}");
2879 return;
2880 }
2881 };
2882 let pts = m.pts;
2883 let pts_count = m.pts_count;
2884 let upd = update::Update::NewMessage(update::make_short_chat(m));
2885 let c = self.clone();
2886 let utx = self.inner.update_tx.clone();
2887 tokio::spawn(async move {
2888 match c
2889 .check_and_fill_gap(pts, pts_count, Some(attach_client_to_update(upd, &c)))
2890 .await
2891 {
2892 Ok(updates) => {
2893 for u in updates {
2894 if utx.try_send(u).is_err() {
2895 tracing::warn!("[layer] update channel full: dropping update");
2896 }
2897 }
2898 }
2899 Err(e) => tracing::warn!("[layer] updateShortChatMessage gap fill: {e}"),
2900 }
2901 });
2902 return;
2903 }
2904
2905 if cid == ID_UPDATE_SHORT_SENT_MSG {
2909 let mut cur = Cursor::from_slice(&body[4..]);
2910 match tl::types::UpdateShortSentMessage::deserialize(&mut cur) {
2911 Ok(m) => {
2912 let pts = m.pts;
2913 let pts_count = m.pts_count;
2914 tracing::debug!(
2915 "[layer] updateShortSentMessage (push): pts={pts} pts_count={pts_count}: advancing pts"
2916 );
2917 let c = self.clone();
2918 let utx = self.inner.update_tx.clone();
2919 tokio::spawn(async move {
2920 match c.check_and_fill_gap(pts, pts_count, None).await {
2921 Ok(replayed) => {
2922 for u in replayed {
2923 if utx.try_send(u).is_err() {
2924 tracing::warn!(
2925 "[layer] update channel full: dropping update"
2926 );
2927 }
2928 }
2929 }
2930 Err(e) => tracing::warn!(
2931 "[layer] updateShortSentMessage push pts advance: {e}"
2932 ),
2933 }
2934 });
2935 }
2936 Err(e) => {
2937 tracing::debug!("[layer] updateShortSentMessage push deserialize error: {e}")
2938 }
2939 }
2940 return;
2941 }
2942
2943 use crate::pts::PtsCheckResult;
2950 use layer_tl_types::{Cursor, Deserializable};
2951
2952 struct ParsedContainer {
2958 seq_info: Option<(i32, i32)>,
2959 users: Vec<tl::enums::User>,
2960 chats: Vec<tl::enums::Chat>,
2961 updates: Vec<tl::enums::Update>,
2962 }
2963
2964 let mut cur = Cursor::from_slice(body);
2965 let parsed: ParsedContainer = match cid {
2966 0x74ae4240 => {
2967 match tl::enums::Updates::deserialize(&mut cur) {
2969 Ok(tl::enums::Updates::Updates(u)) => ParsedContainer {
2970 seq_info: Some((u.seq, u.seq)),
2971 users: u.users,
2972 chats: u.chats,
2973 updates: u.updates,
2974 },
2975 _ => ParsedContainer {
2976 seq_info: None,
2977 users: vec![],
2978 chats: vec![],
2979 updates: vec![],
2980 },
2981 }
2982 }
2983 0x725b04c3 => {
2984 match tl::enums::Updates::deserialize(&mut cur) {
2986 Ok(tl::enums::Updates::Combined(u)) => ParsedContainer {
2987 seq_info: Some((u.seq, u.seq_start)),
2988 users: u.users,
2989 chats: u.chats,
2990 updates: u.updates,
2991 },
2992 _ => ParsedContainer {
2993 seq_info: None,
2994 users: vec![],
2995 chats: vec![],
2996 updates: vec![],
2997 },
2998 }
2999 }
3000 0x78d4dec1 => {
3001 match tl::types::UpdateShort::deserialize(&mut Cursor::from_slice(body)) {
3003 Ok(u) => ParsedContainer {
3004 seq_info: None,
3005 users: vec![],
3006 chats: vec![],
3007 updates: vec![u.update],
3008 },
3009 Err(_) => ParsedContainer {
3010 seq_info: None,
3011 users: vec![],
3012 chats: vec![],
3013 updates: vec![],
3014 },
3015 }
3016 }
3017 _ => ParsedContainer {
3018 seq_info: None,
3019 users: vec![],
3020 chats: vec![],
3021 updates: vec![],
3022 },
3023 };
3024
3025 if !parsed.users.is_empty() || !parsed.chats.is_empty() {
3027 self.cache_users_and_chats(&parsed.users, &parsed.chats)
3028 .await;
3029 }
3030
3031 if let Some((seq, seq_start)) = parsed.seq_info
3033 && seq != 0
3034 {
3035 let result = self.inner.pts_state.lock().await.check_seq(seq, seq_start);
3036 match result {
3037 PtsCheckResult::Ok => {
3038 }
3040 PtsCheckResult::Duplicate => {
3041 tracing::debug!(
3043 "[layer] seq duplicate (seq={seq}, seq_start={seq_start}): dropping container"
3044 );
3045 return;
3046 }
3047 PtsCheckResult::Gap { expected, got } => {
3048 tracing::warn!(
3051 "[layer] seq gap: expected {expected}, got {got}: getDifference"
3052 );
3053 let c = self.clone();
3054 let utx = self.inner.update_tx.clone();
3055 tokio::spawn(async move {
3056 match c.get_difference().await {
3057 Ok(updates) => {
3058 for u in updates {
3059 if utx.try_send(u).is_err() {
3060 tracing::warn!(
3061 "[layer] update channel full: dropping seq gap update"
3062 );
3063 break;
3064 }
3065 }
3066 }
3067 Err(e) => tracing::warn!("[layer] seq gap fill: {e}"),
3068 }
3069 });
3070 return; }
3072 }
3073 }
3074
3075 let mut raw: Vec<tl::enums::Update> = parsed.updates;
3076
3077 raw.sort_by_key(Self::update_sort_key);
3082
3083 for upd in raw {
3084 self.dispatch_single_update(upd).await;
3085 }
3086
3087 if let Some((seq, _)) = parsed.seq_info
3093 && seq != 0
3094 {
3095 self.inner.pts_state.lock().await.advance_seq(seq);
3096 }
3097 }
3098
3099 async fn dispatch_single_update(&self, upd: tl::enums::Update) {
3102 enum Kind {
3105 GlobalPts {
3106 pts: i32,
3107 pts_count: i32,
3108 carry: bool,
3109 },
3110 ChannelPts {
3111 channel_id: i64,
3112 pts: i32,
3113 pts_count: i32,
3114 carry: bool,
3115 },
3116 Qts {
3117 qts: i32,
3118 },
3119 Passthrough,
3120 }
3121
3122 fn ch_from_msg(msg: &tl::enums::Message) -> i64 {
3123 if let tl::enums::Message::Message(m) = msg
3124 && let tl::enums::Peer::Channel(c) = &m.peer_id
3125 {
3126 return c.channel_id;
3127 }
3128 0
3129 }
3130
3131 let kind = {
3132 use tl::enums::Update::*;
3133 match &upd {
3134 NewMessage(u) => Kind::GlobalPts {
3135 pts: u.pts,
3136 pts_count: u.pts_count,
3137 carry: true,
3138 },
3139 EditMessage(u) => Kind::GlobalPts {
3140 pts: u.pts,
3141 pts_count: u.pts_count,
3142 carry: true,
3143 },
3144 DeleteMessages(u) => Kind::GlobalPts {
3145 pts: u.pts,
3146 pts_count: u.pts_count,
3147 carry: true,
3148 },
3149 ReadHistoryInbox(u) => Kind::GlobalPts {
3150 pts: u.pts,
3151 pts_count: u.pts_count,
3152 carry: false,
3153 },
3154 ReadHistoryOutbox(u) => Kind::GlobalPts {
3155 pts: u.pts,
3156 pts_count: u.pts_count,
3157 carry: false,
3158 },
3159 NewChannelMessage(u) => Kind::ChannelPts {
3160 channel_id: ch_from_msg(&u.message),
3161 pts: u.pts,
3162 pts_count: u.pts_count,
3163 carry: true,
3164 },
3165 EditChannelMessage(u) => Kind::ChannelPts {
3166 channel_id: ch_from_msg(&u.message),
3167 pts: u.pts,
3168 pts_count: u.pts_count,
3169 carry: true,
3170 },
3171 DeleteChannelMessages(u) => Kind::ChannelPts {
3172 channel_id: u.channel_id,
3173 pts: u.pts,
3174 pts_count: u.pts_count,
3175 carry: true,
3176 },
3177 NewEncryptedMessage(u) => Kind::Qts { qts: u.qts },
3178 _ => Kind::Passthrough,
3179 }
3180 };
3181
3182 let high = update::from_single_update_pub(upd);
3183
3184 let to_send: Vec<update::Update> = match kind {
3185 Kind::GlobalPts {
3186 pts,
3187 pts_count,
3188 carry,
3189 } => {
3190 let first = if carry { high.into_iter().next() } else { None };
3191 let c = self.clone();
3195 let utx = self.inner.update_tx.clone();
3196 tokio::spawn(async move {
3197 match c.check_and_fill_gap(pts, pts_count, first).await {
3198 Ok(v) => {
3199 for u in v {
3200 let u = attach_client_to_update(u, &c);
3201 if utx.try_send(u).is_err() {
3202 tracing::warn!("[layer] update channel full: dropping update");
3203 break;
3204 }
3205 }
3206 }
3207 Err(e) => tracing::warn!("[layer] pts gap: {e}"),
3208 }
3209 });
3210 vec![]
3211 }
3212 Kind::ChannelPts {
3213 channel_id,
3214 pts,
3215 pts_count,
3216 carry,
3217 } => {
3218 let first = if carry { high.into_iter().next() } else { None };
3219 if channel_id != 0 {
3220 let c = self.clone();
3222 let utx = self.inner.update_tx.clone();
3223 tokio::spawn(async move {
3224 match c
3225 .check_and_fill_channel_gap(channel_id, pts, pts_count, first)
3226 .await
3227 {
3228 Ok(v) => {
3229 for u in v {
3230 let u = attach_client_to_update(u, &c);
3231 if utx.try_send(u).is_err() {
3232 tracing::warn!(
3233 "[layer] update channel full: dropping update"
3234 );
3235 break;
3236 }
3237 }
3238 }
3239 Err(e) => tracing::warn!("[layer] ch pts gap: {e}"),
3240 }
3241 });
3242 vec![]
3243 } else {
3244 first.into_iter().collect()
3245 }
3246 }
3247 Kind::Qts { qts } => {
3248 let c = self.clone();
3250 tokio::spawn(async move {
3251 if let Err(e) = c.check_and_fill_qts_gap(qts, 1).await {
3252 tracing::warn!("[layer] qts gap: {e}");
3253 }
3254 });
3255 vec![]
3256 }
3257 Kind::Passthrough => high
3258 .into_iter()
3259 .map(|u| match u {
3260 update::Update::NewMessage(msg) => {
3261 update::Update::NewMessage(msg.with_client(self.clone()))
3262 }
3263 update::Update::MessageEdited(msg) => {
3264 update::Update::MessageEdited(msg.with_client(self.clone()))
3265 }
3266 other => other,
3267 })
3268 .collect(),
3269 };
3270
3271 for u in to_send {
3272 if self.inner.update_tx.try_send(u).is_err() {
3273 tracing::warn!("[layer] update channel full: dropping update");
3274 }
3275 }
3276 }
3277
3278 async fn do_reconnect_loop(
3288 &self,
3289 initial_delay_ms: u64,
3290 rh: &mut OwnedReadHalf,
3291 fk: &mut FrameKind,
3292 ak: &mut [u8; 256],
3293 sid: &mut i64,
3294 network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
3295 ) -> Option<oneshot::Receiver<Result<(), InvocationError>>> {
3296 let mut delay_ms = if initial_delay_ms == 0 {
3297 0
3300 } else {
3301 initial_delay_ms.max(RECONNECT_BASE_MS)
3302 };
3303 loop {
3304 tracing::debug!("[layer] Reconnecting in {delay_ms} ms …");
3305 tokio::select! {
3306 _ = sleep(Duration::from_millis(delay_ms)) => {}
3307 hint = network_hint_rx.recv() => {
3308 hint?; tracing::debug!("[layer] Network hint → skipping backoff, reconnecting now");
3310 }
3311 }
3312
3313 match self.do_reconnect(ak, fk).await {
3314 Ok((new_rh, new_fk, new_ak, new_sid)) => {
3315 *rh = new_rh;
3316 *fk = new_fk;
3317 *ak = new_ak;
3318 *sid = new_sid;
3319 tracing::debug!("[layer] TCP reconnected ✓: initialising session …");
3320
3321 let (init_tx, init_rx) = oneshot::channel();
3325 let c = self.clone();
3326 let utx = self.inner.update_tx.clone();
3327 tokio::spawn(async move {
3328 let result = loop {
3333 match c.init_connection().await {
3334 Ok(()) => break Ok(()),
3335 Err(InvocationError::Rpc(ref r))
3336 if r.flood_wait_seconds().is_some() =>
3337 {
3338 let secs = r.flood_wait_seconds().unwrap();
3339 tracing::warn!(
3340 "[layer] init_connection FLOOD_WAIT_{secs}: waiting before retry"
3341 );
3342 sleep(Duration::from_secs(secs + 1)).await;
3343 }
3345 Err(e) => break Err(e),
3346 }
3347 };
3348 if result.is_ok() {
3349 if c.inner
3357 .dh_in_progress
3358 .load(std::sync::atomic::Ordering::SeqCst)
3359 {
3360 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
3361 }
3362 let missed = match c.get_difference().await {
3363 Ok(updates) => updates,
3364 Err(ref e)
3365 if matches!(e,
3366 InvocationError::Rpc(r) if r.code == 401) =>
3367 {
3368 tracing::warn!(
3369 "[layer] getDifference AUTH_KEY_UNREGISTERED after \
3370 fresh DH: falling back to sync_pts_state"
3371 );
3372 let _ = c.sync_pts_state().await;
3373 vec![]
3374 }
3375 Err(e) => {
3376 tracing::warn!(
3377 "[layer] getDifference failed after reconnect: {e}"
3378 );
3379 vec![]
3380 }
3381 };
3382 for u in missed {
3383 if utx.try_send(attach_client_to_update(u, &c)).is_err() {
3384 tracing::warn!(
3385 "[layer] update channel full: dropping catch-up update"
3386 );
3387 break;
3388 }
3389 }
3390 }
3391 let _ = init_tx.send(result);
3392 });
3393 return Some(init_rx);
3394 }
3395 Err(e) => {
3396 tracing::warn!("[layer] Reconnect attempt failed: {e}");
3397 let next = delay_ms
3401 .saturating_mul(2)
3402 .clamp(RECONNECT_BASE_MS, RECONNECT_MAX_SECS * 1_000);
3403 delay_ms = jitter_delay(next).as_millis() as u64;
3404 }
3405 }
3406 }
3407 }
3408
3409 async fn do_reconnect(
3411 &self,
3412 _old_auth_key: &[u8; 256],
3413 _old_frame_kind: &FrameKind,
3414 ) -> Result<(OwnedReadHalf, FrameKind, [u8; 256], i64), InvocationError> {
3415 let home_dc_id = *self.inner.home_dc_id.lock().await;
3416 let (addr, saved_key, first_salt, time_offset) = {
3417 let opts = self.inner.dc_options.lock().await;
3418 match opts.get(&home_dc_id) {
3419 Some(e) => (e.addr.clone(), e.auth_key, e.first_salt, e.time_offset),
3420 None => (
3421 crate::dc_migration::fallback_dc_addr(home_dc_id).to_string(),
3422 None,
3423 0,
3424 0,
3425 ),
3426 }
3427 };
3428 let socks5 = self.inner.socks5.clone();
3429 let mtproxy = self.inner.mtproxy.clone();
3430 let transport = self.inner.transport.clone();
3431
3432 let new_conn = if let Some(key) = saved_key {
3433 tracing::debug!("[layer] Reconnecting to DC{home_dc_id} with saved key …");
3434 match Connection::connect_with_key(
3435 &addr,
3436 key,
3437 first_salt,
3438 time_offset,
3439 socks5.as_ref(),
3440 &transport,
3441 home_dc_id as i16,
3442 )
3443 .await
3444 {
3445 Ok(c) => c,
3446 Err(e) => {
3447 return Err(e);
3448 }
3449 }
3450 } else {
3451 Connection::connect_raw(
3452 &addr,
3453 socks5.as_ref(),
3454 mtproxy.as_ref(),
3455 &transport,
3456 home_dc_id as i16,
3457 )
3458 .await?
3459 };
3460
3461 let (new_writer, new_wh, new_read, new_fk) = new_conn.into_writer();
3462 let new_ak = new_writer.enc.auth_key_bytes();
3463 let new_sid = new_writer.enc.session_id();
3464 *self.inner.writer.lock().await = new_writer;
3465 *self.inner.write_half.lock().await = new_wh;
3466
3467 self.inner
3475 .salt_request_in_flight
3476 .store(false, std::sync::atomic::Ordering::SeqCst);
3477
3478 {
3483 let mut opts = self.inner.dc_options.lock().await;
3484 if let Some(entry) = opts.get_mut(&home_dc_id) {
3485 entry.auth_key = Some(new_ak);
3486 }
3487 }
3488
3489 Ok((new_read, new_fk, new_ak, new_sid))
3501 }
3502
3503 pub async fn send_message(
3507 &self,
3508 peer: &str,
3509 text: &str,
3510 ) -> Result<update::IncomingMessage, InvocationError> {
3511 let p = self.resolve_peer(peer).await?;
3512 self.send_message_to_peer(p, text).await
3513 }
3514
3515 pub async fn send_message_to_peer(
3520 &self,
3521 peer: impl Into<PeerRef>,
3522 text: &str,
3523 ) -> Result<update::IncomingMessage, InvocationError> {
3524 self.send_message_to_peer_ex(peer, &InputMessage::text(text))
3525 .await
3526 }
3527
3528 pub async fn send_message_to_peer_ex(
3533 &self,
3534 peer: impl Into<PeerRef>,
3535 msg: &InputMessage,
3536 ) -> Result<update::IncomingMessage, InvocationError> {
3537 let peer = peer.into().resolve(self).await?;
3538 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
3539 let schedule = if msg.schedule_once_online {
3540 Some(0x7FFF_FFFEi32)
3541 } else {
3542 msg.schedule_date
3543 };
3544
3545 if let Some(media) = &msg.media {
3547 let req = tl::functions::messages::SendMedia {
3548 silent: msg.silent,
3549 background: msg.background,
3550 clear_draft: msg.clear_draft,
3551 noforwards: false,
3552 update_stickersets_order: false,
3553 invert_media: msg.invert_media,
3554 allow_paid_floodskip: false,
3555 peer: input_peer,
3556 reply_to: msg.reply_header(),
3557 media: media.clone(),
3558 message: msg.text.clone(),
3559 random_id: random_i64(),
3560 reply_markup: msg.reply_markup.clone(),
3561 entities: msg.entities.clone(),
3562 schedule_date: schedule,
3563 schedule_repeat_period: None,
3564 send_as: None,
3565 quick_reply_shortcut: None,
3566 effect: None,
3567 allow_paid_stars: None,
3568 suggested_post: None,
3569 };
3570 let body = self.rpc_call_raw_pub(&req).await?;
3571 return Ok(self.extract_sent_message(&body, msg, &peer));
3572 }
3573
3574 let req = tl::functions::messages::SendMessage {
3575 no_webpage: msg.no_webpage,
3576 silent: msg.silent,
3577 background: msg.background,
3578 clear_draft: msg.clear_draft,
3579 noforwards: false,
3580 update_stickersets_order: false,
3581 invert_media: msg.invert_media,
3582 allow_paid_floodskip: false,
3583 peer: input_peer,
3584 reply_to: msg.reply_header(),
3585 message: msg.text.clone(),
3586 random_id: random_i64(),
3587 reply_markup: msg.reply_markup.clone(),
3588 entities: msg.entities.clone(),
3589 schedule_date: schedule,
3590 schedule_repeat_period: None,
3591 send_as: None,
3592 quick_reply_shortcut: None,
3593 effect: None,
3594 allow_paid_stars: None,
3595 suggested_post: None,
3596 };
3597 let body = self.rpc_call_raw(&req).await?;
3598 Ok(self.extract_sent_message(&body, msg, &peer))
3599 }
3600
3601 fn extract_sent_message(
3605 &self,
3606 body: &[u8],
3607 input: &InputMessage,
3608 peer: &tl::enums::Peer,
3609 ) -> update::IncomingMessage {
3610 if body.len() < 4 {
3611 return self.synthetic_sent(input, peer, 0, 0);
3612 }
3613 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
3614
3615 if cid == 0x74ae4240 || cid == 0x725b04c3 {
3617 let mut cur = Cursor::from_slice(body);
3618 if let Ok(tl::enums::Updates::Updates(u)) = tl::enums::Updates::deserialize(&mut cur) {
3619 for upd in &u.updates {
3620 if let tl::enums::Update::NewMessage(nm) = upd {
3621 return update::IncomingMessage::from_raw(nm.message.clone())
3622 .with_client(self.clone());
3623 }
3624 if let tl::enums::Update::NewChannelMessage(nm) = upd {
3625 return update::IncomingMessage::from_raw(nm.message.clone())
3626 .with_client(self.clone());
3627 }
3628 }
3629 }
3630 if let Ok(tl::enums::Updates::Combined(u)) =
3631 tl::enums::Updates::deserialize(&mut Cursor::from_slice(body))
3632 {
3633 for upd in &u.updates {
3634 if let tl::enums::Update::NewMessage(nm) = upd {
3635 return update::IncomingMessage::from_raw(nm.message.clone())
3636 .with_client(self.clone());
3637 }
3638 if let tl::enums::Update::NewChannelMessage(nm) = upd {
3639 return update::IncomingMessage::from_raw(nm.message.clone())
3640 .with_client(self.clone());
3641 }
3642 }
3643 }
3644 }
3645
3646 if cid == 0x9015e101 {
3649 let mut cur = Cursor::from_slice(&body[4..]);
3650 if let Ok(sent) = tl::types::UpdateShortSentMessage::deserialize(&mut cur) {
3651 return self.synthetic_sent_from_short(sent, input, peer);
3652 }
3653 }
3654
3655 if cid == 0x313bc7f8 {
3657 let mut cur = Cursor::from_slice(&body[4..]);
3658 if let Ok(m) = tl::types::UpdateShortMessage::deserialize(&mut cur) {
3659 let msg = tl::types::Message {
3660 out: m.out,
3661 mentioned: m.mentioned,
3662 media_unread: m.media_unread,
3663 silent: m.silent,
3664 post: false,
3665 from_scheduled: false,
3666 legacy: false,
3667 edit_hide: false,
3668 pinned: false,
3669 noforwards: false,
3670 invert_media: false,
3671 offline: false,
3672 video_processing_pending: false,
3673 paid_suggested_post_stars: false,
3674 paid_suggested_post_ton: false,
3675 id: m.id,
3676 from_id: Some(tl::enums::Peer::User(tl::types::PeerUser {
3677 user_id: m.user_id,
3678 })),
3679 peer_id: tl::enums::Peer::User(tl::types::PeerUser { user_id: m.user_id }),
3680 saved_peer_id: None,
3681 fwd_from: m.fwd_from,
3682 via_bot_id: m.via_bot_id,
3683 via_business_bot_id: None,
3684 reply_to: m.reply_to,
3685 date: m.date,
3686 message: m.message,
3687 media: None,
3688 reply_markup: None,
3689 entities: m.entities,
3690 views: None,
3691 forwards: None,
3692 replies: None,
3693 edit_date: None,
3694 post_author: None,
3695 grouped_id: None,
3696 reactions: None,
3697 restriction_reason: None,
3698 ttl_period: None,
3699 quick_reply_shortcut_id: None,
3700 effect: None,
3701 factcheck: None,
3702 report_delivery_until_date: None,
3703 paid_message_stars: None,
3704 suggested_post: None,
3705 from_rank: None,
3706 from_boosts_applied: None,
3707 schedule_repeat_period: None,
3708 summary_from_language: None,
3709 };
3710 return update::IncomingMessage::from_raw(tl::enums::Message::Message(msg))
3711 .with_client(self.clone());
3712 }
3713 }
3714
3715 self.synthetic_sent(input, peer, 0, 0)
3717 }
3718
3719 fn synthetic_sent_from_short(
3721 &self,
3722 sent: tl::types::UpdateShortSentMessage,
3723 input: &InputMessage,
3724 peer: &tl::enums::Peer,
3725 ) -> update::IncomingMessage {
3726 let msg = tl::types::Message {
3727 out: sent.out,
3728 mentioned: false,
3729 media_unread: false,
3730 silent: input.silent,
3731 post: false,
3732 from_scheduled: false,
3733 legacy: false,
3734 edit_hide: false,
3735 pinned: false,
3736 noforwards: false,
3737 invert_media: input.invert_media,
3738 offline: false,
3739 video_processing_pending: false,
3740 paid_suggested_post_stars: false,
3741 paid_suggested_post_ton: false,
3742 id: sent.id,
3743 from_id: None,
3744 from_boosts_applied: None,
3745 from_rank: None,
3746 peer_id: peer.clone(),
3747 saved_peer_id: None,
3748 fwd_from: None,
3749 via_bot_id: None,
3750 via_business_bot_id: None,
3751 reply_to: input.reply_to.map(|id| {
3752 tl::enums::MessageReplyHeader::MessageReplyHeader(tl::types::MessageReplyHeader {
3753 reply_to_scheduled: false,
3754 forum_topic: false,
3755 quote: false,
3756 reply_to_msg_id: Some(id),
3757 reply_to_peer_id: None,
3758 reply_from: None,
3759 reply_media: None,
3760 reply_to_top_id: None,
3761 quote_text: None,
3762 quote_entities: None,
3763 quote_offset: None,
3764 todo_item_id: None,
3765 poll_option: None,
3766 })
3767 }),
3768 date: sent.date,
3769 message: input.text.clone(),
3770 media: sent.media,
3771 reply_markup: input.reply_markup.clone(),
3772 entities: sent.entities,
3773 views: None,
3774 forwards: None,
3775 replies: None,
3776 edit_date: None,
3777 post_author: None,
3778 grouped_id: None,
3779 reactions: None,
3780 restriction_reason: None,
3781 ttl_period: sent.ttl_period,
3782 quick_reply_shortcut_id: None,
3783 effect: None,
3784 factcheck: None,
3785 report_delivery_until_date: None,
3786 paid_message_stars: None,
3787 suggested_post: None,
3788 schedule_repeat_period: None,
3789 summary_from_language: None,
3790 };
3791 update::IncomingMessage::from_raw(tl::enums::Message::Message(msg))
3792 .with_client(self.clone())
3793 }
3794
3795 fn synthetic_sent(
3797 &self,
3798 input: &InputMessage,
3799 peer: &tl::enums::Peer,
3800 id: i32,
3801 date: i32,
3802 ) -> update::IncomingMessage {
3803 let msg = tl::types::Message {
3804 out: true,
3805 mentioned: false,
3806 media_unread: false,
3807 silent: input.silent,
3808 post: false,
3809 from_scheduled: false,
3810 legacy: false,
3811 edit_hide: false,
3812 pinned: false,
3813 noforwards: false,
3814 invert_media: input.invert_media,
3815 offline: false,
3816 video_processing_pending: false,
3817 paid_suggested_post_stars: false,
3818 paid_suggested_post_ton: false,
3819 id,
3820 from_id: None,
3821 from_boosts_applied: None,
3822 from_rank: None,
3823 peer_id: peer.clone(),
3824 saved_peer_id: None,
3825 fwd_from: None,
3826 via_bot_id: None,
3827 via_business_bot_id: None,
3828 reply_to: input.reply_to.map(|rid| {
3829 tl::enums::MessageReplyHeader::MessageReplyHeader(tl::types::MessageReplyHeader {
3830 reply_to_scheduled: false,
3831 forum_topic: false,
3832 quote: false,
3833 reply_to_msg_id: Some(rid),
3834 reply_to_peer_id: None,
3835 reply_from: None,
3836 reply_media: None,
3837 reply_to_top_id: None,
3838 quote_text: None,
3839 quote_entities: None,
3840 quote_offset: None,
3841 todo_item_id: None,
3842 poll_option: None,
3843 })
3844 }),
3845 date,
3846 message: input.text.clone(),
3847 media: None,
3848 reply_markup: input.reply_markup.clone(),
3849 entities: input.entities.clone(),
3850 views: None,
3851 forwards: None,
3852 replies: None,
3853 edit_date: None,
3854 post_author: None,
3855 grouped_id: None,
3856 reactions: None,
3857 restriction_reason: None,
3858 ttl_period: None,
3859 quick_reply_shortcut_id: None,
3860 effect: None,
3861 factcheck: None,
3862 report_delivery_until_date: None,
3863 paid_message_stars: None,
3864 suggested_post: None,
3865 schedule_repeat_period: None,
3866 summary_from_language: None,
3867 };
3868 update::IncomingMessage::from_raw(tl::enums::Message::Message(msg))
3869 .with_client(self.clone())
3870 }
3871
3872 pub async fn send_to_self(
3874 &self,
3875 text: &str,
3876 ) -> Result<update::IncomingMessage, InvocationError> {
3877 let req = tl::functions::messages::SendMessage {
3878 no_webpage: false,
3879 silent: false,
3880 background: false,
3881 clear_draft: false,
3882 noforwards: false,
3883 update_stickersets_order: false,
3884 invert_media: false,
3885 allow_paid_floodskip: false,
3886 peer: tl::enums::InputPeer::PeerSelf,
3887 reply_to: None,
3888 message: text.to_string(),
3889 random_id: random_i64(),
3890 reply_markup: None,
3891 entities: None,
3892 schedule_date: None,
3893 schedule_repeat_period: None,
3894 send_as: None,
3895 quick_reply_shortcut: None,
3896 effect: None,
3897 allow_paid_stars: None,
3898 suggested_post: None,
3899 };
3900 let body = self.rpc_call_raw(&req).await?;
3901 let self_peer = tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 });
3902 Ok(self.extract_sent_message(&body, &InputMessage::text(text), &self_peer))
3903 }
3904
3905 pub async fn edit_message(
3907 &self,
3908 peer: impl Into<PeerRef>,
3909 message_id: i32,
3910 new_text: &str,
3911 ) -> Result<(), InvocationError> {
3912 let peer = peer.into().resolve(self).await?;
3913 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
3914 let req = tl::functions::messages::EditMessage {
3915 no_webpage: false,
3916 invert_media: false,
3917 peer: input_peer,
3918 id: message_id,
3919 message: Some(new_text.to_string()),
3920 media: None,
3921 reply_markup: None,
3922 entities: None,
3923 schedule_date: None,
3924 schedule_repeat_period: None,
3925 quick_reply_shortcut_id: None,
3926 };
3927 self.rpc_write(&req).await
3928 }
3929
3930 pub async fn forward_messages(
3932 &self,
3933 destination: impl Into<PeerRef>,
3934 message_ids: &[i32],
3935 source: impl Into<PeerRef>,
3936 ) -> Result<(), InvocationError> {
3937 let dest = destination.into().resolve(self).await?;
3938 let src = source.into().resolve(self).await?;
3939 let cache = self.inner.peer_cache.read().await;
3940 let to_peer = cache.peer_to_input(&dest);
3941 let from_peer = cache.peer_to_input(&src);
3942 drop(cache);
3943
3944 let req = tl::functions::messages::ForwardMessages {
3945 silent: false,
3946 background: false,
3947 with_my_score: false,
3948 drop_author: false,
3949 drop_media_captions: false,
3950 noforwards: false,
3951 from_peer,
3952 id: message_ids.to_vec(),
3953 random_id: (0..message_ids.len()).map(|_| random_i64()).collect(),
3954 to_peer,
3955 top_msg_id: None,
3956 reply_to: None,
3957 schedule_date: None,
3958 schedule_repeat_period: None,
3959 send_as: None,
3960 quick_reply_shortcut: None,
3961 effect: None,
3962 video_timestamp: None,
3963 allow_paid_stars: None,
3964 allow_paid_floodskip: false,
3965 suggested_post: None,
3966 };
3967 self.rpc_write(&req).await
3968 }
3969
3970 pub async fn forward_messages_returning(
3975 &self,
3976 destination: impl Into<PeerRef>,
3977 message_ids: &[i32],
3978 source: impl Into<PeerRef>,
3979 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
3980 let dest = destination.into().resolve(self).await?;
3981 let src = source.into().resolve(self).await?;
3982 let cache = self.inner.peer_cache.read().await;
3983 let to_peer = cache.peer_to_input(&dest);
3984 let from_peer = cache.peer_to_input(&src);
3985 drop(cache);
3986
3987 let req = tl::functions::messages::ForwardMessages {
3988 silent: false,
3989 background: false,
3990 with_my_score: false,
3991 drop_author: false,
3992 drop_media_captions: false,
3993 noforwards: false,
3994 from_peer,
3995 id: message_ids.to_vec(),
3996 random_id: (0..message_ids.len()).map(|_| random_i64()).collect(),
3997 to_peer,
3998 top_msg_id: None,
3999 reply_to: None,
4000 schedule_date: None,
4001 schedule_repeat_period: None,
4002 send_as: None,
4003 quick_reply_shortcut: None,
4004 effect: None,
4005 video_timestamp: None,
4006 allow_paid_stars: None,
4007 allow_paid_floodskip: false,
4008 suggested_post: None,
4009 };
4010 let body = self.rpc_call_raw(&req).await?;
4011 let mut out = Vec::new();
4013 if body.len() >= 4 {
4014 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
4015 if cid == 0x74ae4240 || cid == 0x725b04c3 {
4016 let mut cur = Cursor::from_slice(&body);
4017 let updates_opt = tl::enums::Updates::deserialize(&mut cur).ok();
4018 let raw_updates = match updates_opt {
4019 Some(tl::enums::Updates::Updates(u)) => u.updates,
4020 Some(tl::enums::Updates::Combined(u)) => u.updates,
4021 _ => vec![],
4022 };
4023 for upd in raw_updates {
4024 match upd {
4025 tl::enums::Update::NewMessage(u) => {
4026 out.push(
4027 update::IncomingMessage::from_raw(u.message)
4028 .with_client(self.clone()),
4029 );
4030 }
4031 tl::enums::Update::NewChannelMessage(u) => {
4032 out.push(
4033 update::IncomingMessage::from_raw(u.message)
4034 .with_client(self.clone()),
4035 );
4036 }
4037 _ => {}
4038 }
4039 }
4040 }
4041 }
4042 Ok(out)
4043 }
4044
4045 pub async fn delete_messages(
4047 &self,
4048 message_ids: Vec<i32>,
4049 revoke: bool,
4050 ) -> Result<(), InvocationError> {
4051 let req = tl::functions::messages::DeleteMessages {
4052 revoke,
4053 id: message_ids,
4054 };
4055 self.rpc_write(&req).await
4056 }
4057
4058 pub async fn get_messages_by_id(
4060 &self,
4061 peer: impl Into<PeerRef>,
4062 ids: &[i32],
4063 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4064 let peer = peer.into().resolve(self).await?;
4065 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4066 let id_list: Vec<tl::enums::InputMessage> = ids
4067 .iter()
4068 .map(|&id| tl::enums::InputMessage::Id(tl::types::InputMessageId { id }))
4069 .collect();
4070 let req = tl::functions::channels::GetMessages {
4071 channel: match &input_peer {
4072 tl::enums::InputPeer::Channel(c) => {
4073 tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
4074 channel_id: c.channel_id,
4075 access_hash: c.access_hash,
4076 })
4077 }
4078 _ => return self.get_messages_user(input_peer, id_list).await,
4079 },
4080 id: id_list,
4081 };
4082 let body = self.rpc_call_raw(&req).await?;
4083 let mut cur = Cursor::from_slice(&body);
4084 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4085 tl::enums::messages::Messages::Messages(m) => m.messages,
4086 tl::enums::messages::Messages::Slice(m) => m.messages,
4087 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4088 tl::enums::messages::Messages::NotModified(_) => vec![],
4089 };
4090 Ok(msgs
4091 .into_iter()
4092 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
4093 .collect())
4094 }
4095
4096 async fn get_messages_user(
4097 &self,
4098 _peer: tl::enums::InputPeer,
4099 ids: Vec<tl::enums::InputMessage>,
4100 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4101 let req = tl::functions::messages::GetMessages { id: ids };
4102 let body = self.rpc_call_raw(&req).await?;
4103 let mut cur = Cursor::from_slice(&body);
4104 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4105 tl::enums::messages::Messages::Messages(m) => m.messages,
4106 tl::enums::messages::Messages::Slice(m) => m.messages,
4107 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4108 tl::enums::messages::Messages::NotModified(_) => vec![],
4109 };
4110 Ok(msgs
4111 .into_iter()
4112 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
4113 .collect())
4114 }
4115
4116 pub async fn get_pinned_message(
4118 &self,
4119 peer: impl Into<PeerRef>,
4120 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
4121 let peer = peer.into().resolve(self).await?;
4122 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4123 let req = tl::functions::messages::Search {
4124 peer: input_peer,
4125 q: String::new(),
4126 from_id: None,
4127 saved_peer_id: None,
4128 saved_reaction: None,
4129 top_msg_id: None,
4130 filter: tl::enums::MessagesFilter::InputMessagesFilterPinned,
4131 min_date: 0,
4132 max_date: 0,
4133 offset_id: 0,
4134 add_offset: 0,
4135 limit: 1,
4136 max_id: 0,
4137 min_id: 0,
4138 hash: 0,
4139 };
4140 let body = self.rpc_call_raw(&req).await?;
4141 let mut cur = Cursor::from_slice(&body);
4142 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4143 tl::enums::messages::Messages::Messages(m) => m.messages,
4144 tl::enums::messages::Messages::Slice(m) => m.messages,
4145 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4146 tl::enums::messages::Messages::NotModified(_) => vec![],
4147 };
4148 Ok(msgs
4149 .into_iter()
4150 .next()
4151 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone())))
4152 }
4153
4154 pub async fn pin_message(
4156 &self,
4157 peer: impl Into<PeerRef>,
4158 message_id: i32,
4159 silent: bool,
4160 unpin: bool,
4161 pm_oneside: bool,
4162 ) -> Result<(), InvocationError> {
4163 let peer = peer.into().resolve(self).await?;
4164 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4165 let req = tl::functions::messages::UpdatePinnedMessage {
4166 silent,
4167 unpin,
4168 pm_oneside,
4169 peer: input_peer,
4170 id: message_id,
4171 };
4172 self.rpc_write(&req).await
4173 }
4174
4175 pub async fn unpin_message(
4177 &self,
4178 peer: impl Into<PeerRef>,
4179 message_id: i32,
4180 ) -> Result<(), InvocationError> {
4181 self.pin_message(peer, message_id, true, true, false).await
4182 }
4183
4184 pub async fn get_reply_to_message(
4199 &self,
4200 message: &update::IncomingMessage,
4201 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
4202 let reply_id = match message.reply_to_message_id() {
4203 Some(id) => id,
4204 None => return Ok(None),
4205 };
4206 let peer = match message.peer_id() {
4207 Some(p) => p.clone(),
4208 None => return Ok(None),
4209 };
4210 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4211 let id = vec![tl::enums::InputMessage::Id(tl::types::InputMessageId {
4212 id: reply_id,
4213 })];
4214
4215 let result = match &input_peer {
4216 tl::enums::InputPeer::Channel(c) => {
4217 let req = tl::functions::channels::GetMessages {
4218 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
4219 channel_id: c.channel_id,
4220 access_hash: c.access_hash,
4221 }),
4222 id,
4223 };
4224 self.rpc_call_raw(&req).await?
4225 }
4226 _ => {
4227 let req = tl::functions::messages::GetMessages { id };
4228 self.rpc_call_raw(&req).await?
4229 }
4230 };
4231
4232 let mut cur = Cursor::from_slice(&result);
4233 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4234 tl::enums::messages::Messages::Messages(m) => m.messages,
4235 tl::enums::messages::Messages::Slice(m) => m.messages,
4236 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4237 tl::enums::messages::Messages::NotModified(_) => vec![],
4238 };
4239 Ok(msgs
4240 .into_iter()
4241 .next()
4242 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone())))
4243 }
4244
4245 pub async fn unpin_all_messages(
4247 &self,
4248 peer: impl Into<PeerRef>,
4249 ) -> Result<(), InvocationError> {
4250 let peer = peer.into().resolve(self).await?;
4251 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4252 let req = tl::functions::messages::UnpinAllMessages {
4253 peer: input_peer,
4254 top_msg_id: None,
4255 saved_peer_id: None,
4256 };
4257 self.rpc_write(&req).await
4258 }
4259
4260 pub async fn search_messages(
4265 &self,
4266 peer: impl Into<PeerRef>,
4267 query: &str,
4268 limit: i32,
4269 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4270 self.search(peer, query).limit(limit).fetch(self).await
4271 }
4272
4273 pub fn search(&self, peer: impl Into<PeerRef>, query: &str) -> SearchBuilder {
4275 SearchBuilder::new(peer.into(), query.to_string())
4276 }
4277
4278 pub async fn search_global(
4280 &self,
4281 query: &str,
4282 limit: i32,
4283 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4284 self.search_global_builder(query)
4285 .limit(limit)
4286 .fetch(self)
4287 .await
4288 }
4289
4290 pub fn search_global_builder(&self, query: &str) -> GlobalSearchBuilder {
4292 GlobalSearchBuilder::new(query.to_string())
4293 }
4294
4295 pub async fn get_scheduled_messages(
4312 &self,
4313 peer: impl Into<PeerRef>,
4314 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4315 let peer = peer.into().resolve(self).await?;
4316 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4317 let req = tl::functions::messages::GetScheduledHistory {
4318 peer: input_peer,
4319 hash: 0,
4320 };
4321 let body = self.rpc_call_raw(&req).await?;
4322 let mut cur = Cursor::from_slice(&body);
4323 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4324 tl::enums::messages::Messages::Messages(m) => m.messages,
4325 tl::enums::messages::Messages::Slice(m) => m.messages,
4326 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4327 tl::enums::messages::Messages::NotModified(_) => vec![],
4328 };
4329 Ok(msgs
4330 .into_iter()
4331 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
4332 .collect())
4333 }
4334
4335 pub async fn delete_scheduled_messages(
4337 &self,
4338 peer: impl Into<PeerRef>,
4339 ids: Vec<i32>,
4340 ) -> Result<(), InvocationError> {
4341 let peer = peer.into().resolve(self).await?;
4342 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4343 let req = tl::functions::messages::DeleteScheduledMessages {
4344 peer: input_peer,
4345 id: ids,
4346 };
4347 self.rpc_write(&req).await
4348 }
4349
4350 pub async fn edit_inline_message(
4368 &self,
4369 id: tl::enums::InputBotInlineMessageId,
4370 new_text: &str,
4371 reply_markup: Option<tl::enums::ReplyMarkup>,
4372 ) -> Result<bool, InvocationError> {
4373 let req = tl::functions::messages::EditInlineBotMessage {
4374 no_webpage: false,
4375 invert_media: false,
4376 id,
4377 message: Some(new_text.to_string()),
4378 media: None,
4379 reply_markup,
4380 entities: None,
4381 };
4382 let body = self.rpc_call_raw(&req).await?;
4383 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
4385 }
4386
4387 pub async fn answer_callback_query(
4389 &self,
4390 query_id: i64,
4391 text: Option<&str>,
4392 alert: bool,
4393 ) -> Result<bool, InvocationError> {
4394 let req = tl::functions::messages::SetBotCallbackAnswer {
4395 alert,
4396 query_id,
4397 message: text.map(|s| s.to_string()),
4398 url: None,
4399 cache_time: 0,
4400 };
4401 let body = self.rpc_call_raw(&req).await?;
4402 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
4403 }
4404
4405 pub async fn answer_inline_query(
4406 &self,
4407 query_id: i64,
4408 results: Vec<tl::enums::InputBotInlineResult>,
4409 cache_time: i32,
4410 is_personal: bool,
4411 next_offset: Option<String>,
4412 ) -> Result<bool, InvocationError> {
4413 let req = tl::functions::messages::SetInlineBotResults {
4414 gallery: false,
4415 private: is_personal,
4416 query_id,
4417 results,
4418 cache_time,
4419 next_offset,
4420 switch_pm: None,
4421 switch_webview: None,
4422 };
4423 let body = self.rpc_call_raw(&req).await?;
4424 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
4425 }
4426
4427 pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
4431 let req = tl::functions::messages::GetDialogs {
4432 exclude_pinned: false,
4433 folder_id: None,
4434 offset_date: 0,
4435 offset_id: 0,
4436 offset_peer: tl::enums::InputPeer::Empty,
4437 limit,
4438 hash: 0,
4439 };
4440
4441 let body = self.rpc_call_raw(&req).await?;
4442 let mut cur = Cursor::from_slice(&body);
4443 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
4444 tl::enums::messages::Dialogs::Dialogs(d) => d,
4445 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
4446 dialogs: d.dialogs,
4447 messages: d.messages,
4448 chats: d.chats,
4449 users: d.users,
4450 },
4451 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
4452 };
4453
4454 let msg_map: HashMap<i32, tl::enums::Message> = raw
4456 .messages
4457 .into_iter()
4458 .map(|m| {
4459 let id = match &m {
4460 tl::enums::Message::Message(x) => x.id,
4461 tl::enums::Message::Service(x) => x.id,
4462 tl::enums::Message::Empty(x) => x.id,
4463 };
4464 (id, m)
4465 })
4466 .collect();
4467
4468 let user_map: HashMap<i64, tl::enums::User> = raw
4470 .users
4471 .into_iter()
4472 .filter_map(|u| {
4473 if let tl::enums::User::User(ref uu) = u {
4474 Some((uu.id, u))
4475 } else {
4476 None
4477 }
4478 })
4479 .collect();
4480
4481 let chat_map: HashMap<i64, tl::enums::Chat> = raw
4483 .chats
4484 .into_iter()
4485 .map(|c| {
4486 let id = match &c {
4487 tl::enums::Chat::Chat(x) => x.id,
4488 tl::enums::Chat::Forbidden(x) => x.id,
4489 tl::enums::Chat::Channel(x) => x.id,
4490 tl::enums::Chat::ChannelForbidden(x) => x.id,
4491 tl::enums::Chat::Empty(x) => x.id,
4492 };
4493 (id, c)
4494 })
4495 .collect();
4496
4497 {
4499 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
4500 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
4501 self.cache_users_and_chats(&u_list, &c_list).await;
4502 }
4503
4504 let result = raw
4505 .dialogs
4506 .into_iter()
4507 .map(|d| {
4508 let top_id = match &d {
4509 tl::enums::Dialog::Dialog(x) => x.top_message,
4510 _ => 0,
4511 };
4512 let peer = match &d {
4513 tl::enums::Dialog::Dialog(x) => Some(&x.peer),
4514 _ => None,
4515 };
4516
4517 let message = msg_map.get(&top_id).cloned();
4518 let entity = peer.and_then(|p| match p {
4519 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
4520 _ => None,
4521 });
4522 let chat = peer.and_then(|p| match p {
4523 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
4524 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
4525 _ => None,
4526 });
4527
4528 Dialog {
4529 raw: d,
4530 message,
4531 entity,
4532 chat,
4533 }
4534 })
4535 .collect();
4536
4537 Ok(result)
4538 }
4539
4540 #[allow(dead_code)]
4542 async fn get_dialogs_raw(
4543 &self,
4544 req: tl::functions::messages::GetDialogs,
4545 ) -> Result<Vec<Dialog>, InvocationError> {
4546 let body = self.rpc_call_raw(&req).await?;
4547 let mut cur = Cursor::from_slice(&body);
4548 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
4549 tl::enums::messages::Dialogs::Dialogs(d) => d,
4550 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
4551 dialogs: d.dialogs,
4552 messages: d.messages,
4553 chats: d.chats,
4554 users: d.users,
4555 },
4556 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
4557 };
4558
4559 let msg_map: HashMap<i32, tl::enums::Message> = raw
4560 .messages
4561 .into_iter()
4562 .map(|m| {
4563 let id = match &m {
4564 tl::enums::Message::Message(x) => x.id,
4565 tl::enums::Message::Service(x) => x.id,
4566 tl::enums::Message::Empty(x) => x.id,
4567 };
4568 (id, m)
4569 })
4570 .collect();
4571
4572 let user_map: HashMap<i64, tl::enums::User> = raw
4573 .users
4574 .into_iter()
4575 .filter_map(|u| {
4576 if let tl::enums::User::User(ref uu) = u {
4577 Some((uu.id, u))
4578 } else {
4579 None
4580 }
4581 })
4582 .collect();
4583
4584 let chat_map: HashMap<i64, tl::enums::Chat> = raw
4585 .chats
4586 .into_iter()
4587 .map(|c| {
4588 let id = match &c {
4589 tl::enums::Chat::Chat(x) => x.id,
4590 tl::enums::Chat::Forbidden(x) => x.id,
4591 tl::enums::Chat::Channel(x) => x.id,
4592 tl::enums::Chat::ChannelForbidden(x) => x.id,
4593 tl::enums::Chat::Empty(x) => x.id,
4594 };
4595 (id, c)
4596 })
4597 .collect();
4598
4599 {
4600 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
4601 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
4602 self.cache_users_and_chats(&u_list, &c_list).await;
4603 }
4604
4605 let result = raw
4606 .dialogs
4607 .into_iter()
4608 .map(|d| {
4609 let top_id = match &d {
4610 tl::enums::Dialog::Dialog(x) => x.top_message,
4611 _ => 0,
4612 };
4613 let peer = match &d {
4614 tl::enums::Dialog::Dialog(x) => Some(&x.peer),
4615 _ => None,
4616 };
4617
4618 let message = msg_map.get(&top_id).cloned();
4619 let entity = peer.and_then(|p| match p {
4620 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
4621 _ => None,
4622 });
4623 let chat = peer.and_then(|p| match p {
4624 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
4625 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
4626 _ => None,
4627 });
4628
4629 Dialog {
4630 raw: d,
4631 message,
4632 entity,
4633 chat,
4634 }
4635 })
4636 .collect();
4637
4638 Ok(result)
4639 }
4640
4641 async fn get_dialogs_raw_with_count(
4643 &self,
4644 req: tl::functions::messages::GetDialogs,
4645 ) -> Result<(Vec<Dialog>, Option<i32>), InvocationError> {
4646 let body = self.rpc_call_raw(&req).await?;
4647 let mut cur = Cursor::from_slice(&body);
4648 let (raw, count) = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
4649 tl::enums::messages::Dialogs::Dialogs(d) => (d, None),
4650 tl::enums::messages::Dialogs::Slice(d) => {
4651 let cnt = Some(d.count);
4652 (
4653 tl::types::messages::Dialogs {
4654 dialogs: d.dialogs,
4655 messages: d.messages,
4656 chats: d.chats,
4657 users: d.users,
4658 },
4659 cnt,
4660 )
4661 }
4662 tl::enums::messages::Dialogs::NotModified(_) => return Ok((vec![], None)),
4663 };
4664
4665 let msg_map: HashMap<i32, tl::enums::Message> = raw
4666 .messages
4667 .into_iter()
4668 .map(|m| {
4669 let id = match &m {
4670 tl::enums::Message::Message(x) => x.id,
4671 tl::enums::Message::Service(x) => x.id,
4672 tl::enums::Message::Empty(x) => x.id,
4673 };
4674 (id, m)
4675 })
4676 .collect();
4677
4678 let user_map: HashMap<i64, tl::enums::User> = raw
4679 .users
4680 .into_iter()
4681 .filter_map(|u| {
4682 if let tl::enums::User::User(ref uu) = u {
4683 Some((uu.id, u))
4684 } else {
4685 None
4686 }
4687 })
4688 .collect();
4689
4690 let chat_map: HashMap<i64, tl::enums::Chat> = raw
4691 .chats
4692 .into_iter()
4693 .map(|c| {
4694 let id = match &c {
4695 tl::enums::Chat::Chat(x) => x.id,
4696 tl::enums::Chat::Forbidden(x) => x.id,
4697 tl::enums::Chat::Channel(x) => x.id,
4698 tl::enums::Chat::ChannelForbidden(x) => x.id,
4699 tl::enums::Chat::Empty(x) => x.id,
4700 };
4701 (id, c)
4702 })
4703 .collect();
4704
4705 {
4706 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
4707 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
4708 self.cache_users_and_chats(&u_list, &c_list).await;
4709 }
4710
4711 let result = raw
4712 .dialogs
4713 .into_iter()
4714 .map(|d| {
4715 let top_id = match &d {
4716 tl::enums::Dialog::Dialog(x) => x.top_message,
4717 _ => 0,
4718 };
4719 let peer = match &d {
4720 tl::enums::Dialog::Dialog(x) => Some(&x.peer),
4721 _ => None,
4722 };
4723 let message = msg_map.get(&top_id).cloned();
4724 let entity = peer.and_then(|p| match p {
4725 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
4726 _ => None,
4727 });
4728 let chat = peer.and_then(|p| match p {
4729 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
4730 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
4731 _ => None,
4732 });
4733 Dialog {
4734 raw: d,
4735 message,
4736 entity,
4737 chat,
4738 }
4739 })
4740 .collect();
4741
4742 Ok((result, count))
4743 }
4744
4745 async fn get_messages_with_count(
4747 &self,
4748 peer: tl::enums::InputPeer,
4749 limit: i32,
4750 offset_id: i32,
4751 ) -> Result<(Vec<update::IncomingMessage>, Option<i32>), InvocationError> {
4752 let req = tl::functions::messages::GetHistory {
4753 peer,
4754 offset_id,
4755 offset_date: 0,
4756 add_offset: 0,
4757 limit,
4758 max_id: 0,
4759 min_id: 0,
4760 hash: 0,
4761 };
4762 let body = self.rpc_call_raw(&req).await?;
4763 let mut cur = Cursor::from_slice(&body);
4764 let (msgs, count) = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4765 tl::enums::messages::Messages::Messages(m) => (m.messages, None),
4766 tl::enums::messages::Messages::Slice(m) => {
4767 let cnt = Some(m.count);
4768 (m.messages, cnt)
4769 }
4770 tl::enums::messages::Messages::ChannelMessages(m) => (m.messages, Some(m.count)),
4771 tl::enums::messages::Messages::NotModified(_) => (vec![], None),
4772 };
4773 Ok((
4774 msgs.into_iter()
4775 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
4776 .collect(),
4777 count,
4778 ))
4779 }
4780
4781 pub async fn download_media_to_file(
4792 &self,
4793 location: tl::enums::InputFileLocation,
4794 path: impl AsRef<std::path::Path>,
4795 ) -> Result<(), InvocationError> {
4796 let bytes = self.download_media(location).await?;
4797 std::fs::write(path, &bytes).map_err(InvocationError::Io)?;
4798 Ok(())
4799 }
4800
4801 pub async fn delete_dialog(&self, peer: impl Into<PeerRef>) -> Result<(), InvocationError> {
4802 let peer = peer.into().resolve(self).await?;
4803 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4804 let req = tl::functions::messages::DeleteHistory {
4805 just_clear: false,
4806 revoke: false,
4807 peer: input_peer,
4808 max_id: 0,
4809 min_date: None,
4810 max_date: None,
4811 };
4812 self.rpc_write(&req).await
4813 }
4814
4815 pub async fn mark_as_read(&self, peer: impl Into<PeerRef>) -> Result<(), InvocationError> {
4817 let peer = peer.into().resolve(self).await?;
4818 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4819 match &input_peer {
4820 tl::enums::InputPeer::Channel(c) => {
4821 let req = tl::functions::channels::ReadHistory {
4822 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
4823 channel_id: c.channel_id,
4824 access_hash: c.access_hash,
4825 }),
4826 max_id: 0,
4827 };
4828 self.rpc_call_raw(&req).await?;
4829 }
4830 _ => {
4831 let req = tl::functions::messages::ReadHistory {
4832 peer: input_peer,
4833 max_id: 0,
4834 };
4835 self.rpc_call_raw(&req).await?;
4836 }
4837 }
4838 Ok(())
4839 }
4840
4841 pub async fn clear_mentions(&self, peer: impl Into<PeerRef>) -> Result<(), InvocationError> {
4843 let peer = peer.into().resolve(self).await?;
4844 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4845 let req = tl::functions::messages::ReadMentions {
4846 peer: input_peer,
4847 top_msg_id: None,
4848 };
4849 self.rpc_write(&req).await
4850 }
4851
4852 pub async fn send_chat_action(
4860 &self,
4861 peer: impl Into<PeerRef>,
4862 action: tl::enums::SendMessageAction,
4863 ) -> Result<(), InvocationError> {
4864 let peer = peer.into().resolve(self).await?;
4865 self.send_chat_action_ex(peer, action, None).await
4866 }
4867
4868 pub async fn join_chat(&self, peer: impl Into<PeerRef>) -> Result<(), InvocationError> {
4872 let peer = peer.into().resolve(self).await?;
4873 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
4874 match input_peer {
4875 tl::enums::InputPeer::Channel(c) => {
4876 let req = tl::functions::channels::JoinChannel {
4877 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
4878 channel_id: c.channel_id,
4879 access_hash: c.access_hash,
4880 }),
4881 };
4882 self.rpc_call_raw(&req).await?;
4883 }
4884 tl::enums::InputPeer::Chat(c) => {
4885 let req = tl::functions::messages::AddChatUser {
4886 chat_id: c.chat_id,
4887 user_id: tl::enums::InputUser::UserSelf,
4888 fwd_limit: 0,
4889 };
4890 self.rpc_call_raw(&req).await?;
4891 }
4892 _ => {
4893 return Err(InvocationError::Deserialize(
4894 "cannot join this peer type".into(),
4895 ));
4896 }
4897 }
4898 Ok(())
4899 }
4900
4901 pub async fn accept_invite_link(&self, link: &str) -> Result<(), InvocationError> {
4903 let hash = Self::parse_invite_hash(link)
4904 .ok_or_else(|| InvocationError::Deserialize(format!("invalid invite link: {link}")))?;
4905 let req = tl::functions::messages::ImportChatInvite {
4906 hash: hash.to_string(),
4907 };
4908 self.rpc_write(&req).await
4909 }
4910
4911 pub fn parse_invite_hash(link: &str) -> Option<&str> {
4913 if let Some(pos) = link.find("/+") {
4914 return Some(&link[pos + 2..]);
4915 }
4916 if let Some(pos) = link.find("/joinchat/") {
4917 return Some(&link[pos + 10..]);
4918 }
4919 None
4920 }
4921
4922 pub async fn get_messages(
4926 &self,
4927 peer: tl::enums::InputPeer,
4928 limit: i32,
4929 offset_id: i32,
4930 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
4931 let req = tl::functions::messages::GetHistory {
4932 peer,
4933 offset_id,
4934 offset_date: 0,
4935 add_offset: 0,
4936 limit,
4937 max_id: 0,
4938 min_id: 0,
4939 hash: 0,
4940 };
4941 let body = self.rpc_call_raw(&req).await?;
4942 let mut cur = Cursor::from_slice(&body);
4943 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
4944 tl::enums::messages::Messages::Messages(m) => m.messages,
4945 tl::enums::messages::Messages::Slice(m) => m.messages,
4946 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
4947 tl::enums::messages::Messages::NotModified(_) => vec![],
4948 };
4949 Ok(msgs
4950 .into_iter()
4951 .map(|m| update::IncomingMessage::from_raw(m).with_client(self.clone()))
4952 .collect())
4953 }
4954
4955 pub async fn resolve_peer(&self, peer: &str) -> Result<tl::enums::Peer, InvocationError> {
4959 match peer.trim() {
4960 "me" | "self" => Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 })),
4961 username if username.starts_with('@') => self.resolve_username(&username[1..]).await,
4962 id_str => {
4963 if let Ok(id) = id_str.parse::<i64>() {
4964 Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: id }))
4965 } else {
4966 Err(InvocationError::Deserialize(format!(
4967 "cannot resolve peer: {peer}"
4968 )))
4969 }
4970 }
4971 }
4972 }
4973
4974 pub async fn resolve_username(
4978 &self,
4979 username: &str,
4980 ) -> Result<tl::enums::Peer, InvocationError> {
4981 let req = tl::functions::contacts::ResolveUsername {
4982 username: username.to_string(),
4983 referer: None,
4984 };
4985 let body = self.rpc_call_raw(&req).await?;
4986 let mut cur = Cursor::from_slice(&body);
4987 let tl::enums::contacts::ResolvedPeer::ResolvedPeer(resolved) =
4988 tl::enums::contacts::ResolvedPeer::deserialize(&mut cur)?;
4989 self.cache_users_slice(&resolved.users).await;
4991 self.cache_chats_slice(&resolved.chats).await;
4992 Ok(resolved.peer)
4993 }
4994
4995 fn spawn_salt_fetch_if_needed(&self) {
5005 if self
5006 .inner
5007 .salt_request_in_flight
5008 .compare_exchange(
5009 false,
5010 true,
5011 std::sync::atomic::Ordering::SeqCst,
5012 std::sync::atomic::Ordering::SeqCst,
5013 )
5014 .is_err()
5015 {
5016 return; }
5018 let inner = Arc::clone(&self.inner);
5019 tokio::spawn(async move {
5020 tracing::debug!("[layer] proactive GetFutureSalts spawned");
5021 let mut req_body = Vec::with_capacity(8);
5022 req_body.extend_from_slice(&0xb921bd04_u32.to_le_bytes()); req_body.extend_from_slice(&64_i32.to_le_bytes()); let (wire, fk, fs_msg_id) = {
5025 let mut w = inner.writer.lock().await;
5026 let fk = w.frame_kind.clone();
5027 let (wire, id) = w.enc.pack_body_with_msg_id(&req_body, true);
5028 w.sent_bodies.insert(id, req_body);
5029 (wire, fk, id)
5030 };
5031 let (tx, rx) = tokio::sync::oneshot::channel();
5032 inner.pending.lock().await.insert(fs_msg_id, tx);
5033 let send_ok = {
5034 send_frame_write(&mut *inner.write_half.lock().await, &wire, &fk)
5035 .await
5036 .is_ok()
5037 };
5038 if !send_ok {
5039 inner.pending.lock().await.remove(&fs_msg_id);
5040 inner.writer.lock().await.sent_bodies.remove(&fs_msg_id);
5041 inner
5042 .salt_request_in_flight
5043 .store(false, std::sync::atomic::Ordering::SeqCst);
5044 return;
5045 }
5046 let _ = rx.await;
5047 inner
5048 .salt_request_in_flight
5049 .store(false, std::sync::atomic::Ordering::SeqCst);
5050 });
5051 }
5052
5053 pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
5054 let body = self.rpc_call_raw(req).await?;
5055 let mut cur = Cursor::from_slice(&body);
5056 R::Return::deserialize(&mut cur).map_err(Into::into)
5057 }
5058
5059 async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
5060 let mut rl = RetryLoop::new(Arc::clone(&self.inner.retry_policy));
5061 let mut auth_key_retries = 0u32;
5062 loop {
5063 match self.do_rpc_call(req).await {
5064 Ok(body) => return Ok(body),
5065 Err(e) if e.migrate_dc_id().is_some() => {
5066 self.migrate_to(e.migrate_dc_id().unwrap()).await?;
5069 }
5070 Err(InvocationError::Rpc(ref r)) if r.code == 401 => {
5077 auth_key_retries += 1;
5078 if auth_key_retries > 3 {
5079 return Err(InvocationError::Rpc(r.clone()));
5080 }
5081 tracing::warn!(
5082 "[layer] AUTH_KEY_UNREGISTERED on invoke (attempt {}/3): waiting for reader reconnect",
5083 auth_key_retries
5084 );
5085 rl.advance(InvocationError::Io(std::io::Error::new(
5086 std::io::ErrorKind::ConnectionReset,
5087 r.to_string(),
5088 )))
5089 .await?;
5090 }
5091 Err(e) => rl.advance(e).await?,
5092 }
5093 }
5094 }
5095
5096 async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
5106 let (tx, rx) = oneshot::channel();
5107 let wire = {
5108 let raw_body = req.to_bytes();
5109 let body = maybe_gz_pack(&raw_body);
5111
5112 let mut w = self.inner.writer.lock().await;
5113
5114 if w.advance_salt_if_needed() {
5118 drop(w); self.spawn_salt_fetch_if_needed();
5120 w = self.inner.writer.lock().await;
5121 }
5122
5123 let fk = w.frame_kind.clone();
5124
5125 let acks: Vec<i64> = w.pending_ack.drain(..).collect();
5128
5129 if acks.is_empty() {
5130 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
5132 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
5134 (wire, fk)
5135 } else {
5136 let ack_body = build_msgs_ack_body(&acks);
5138 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false); let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true); let container_payload = build_container_body(&[
5142 (ack_msg_id, ack_seqno, ack_body.as_slice()),
5143 (req_msg_id, req_seqno, body.as_slice()),
5144 ]);
5145
5146 let (wire, container_msg_id) = w.enc.pack_container(&container_payload);
5147
5148 w.sent_bodies.insert(req_msg_id, body); w.container_map.insert(container_msg_id, req_msg_id); self.inner.pending.lock().await.insert(req_msg_id, tx);
5151 tracing::debug!(
5152 "[layer] container: bundled {} acks + request (cid={container_msg_id})",
5153 acks.len()
5154 );
5155 (wire, fk)
5156 }
5157 };
5159 send_frame_write(&mut *self.inner.write_half.lock().await, &wire.0, &wire.1).await?;
5161 match rx.await {
5162 Ok(result) => result,
5163 Err(_) => Err(InvocationError::Deserialize(
5164 "RPC channel closed (reader died?)".into(),
5165 )),
5166 }
5167 }
5168
5169 async fn rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
5172 let mut fail_count = NonZeroU32::new(1).unwrap();
5173 let mut slept_so_far = Duration::default();
5174 loop {
5175 let result = self.do_rpc_write(req).await;
5176 match result {
5177 Ok(()) => return Ok(()),
5178 Err(e) => {
5179 let ctx = RetryContext {
5180 fail_count,
5181 slept_so_far,
5182 error: e,
5183 };
5184 match self.inner.retry_policy.should_retry(&ctx) {
5185 ControlFlow::Continue(delay) => {
5186 sleep(delay).await;
5187 slept_so_far += delay;
5188 fail_count = fail_count.saturating_add(1);
5189 }
5190 ControlFlow::Break(()) => return Err(ctx.error),
5191 }
5192 }
5193 }
5194 }
5195 }
5196
5197 async fn do_rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
5198 let (tx, rx) = oneshot::channel();
5199 let wire = {
5200 let raw_body = req.to_bytes();
5201 let body = maybe_gz_pack(&raw_body);
5203
5204 let mut w = self.inner.writer.lock().await;
5205 let fk = w.frame_kind.clone();
5206
5207 let acks: Vec<i64> = w.pending_ack.drain(..).collect();
5209
5210 if acks.is_empty() {
5211 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
5212 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
5214 (wire, fk)
5215 } else {
5216 let ack_body = build_msgs_ack_body(&acks);
5217 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
5218 let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);
5219 let container_payload = build_container_body(&[
5220 (ack_msg_id, ack_seqno, ack_body.as_slice()),
5221 (req_msg_id, req_seqno, body.as_slice()),
5222 ]);
5223 let (wire, container_msg_id) = w.enc.pack_container(&container_payload);
5224 w.sent_bodies.insert(req_msg_id, body); w.container_map.insert(container_msg_id, req_msg_id); self.inner.pending.lock().await.insert(req_msg_id, tx);
5227 tracing::debug!(
5228 "[layer] write container: bundled {} acks + write (cid={container_msg_id})",
5229 acks.len()
5230 );
5231 (wire, fk)
5232 }
5233 };
5235 send_frame_write(&mut *self.inner.write_half.lock().await, &wire.0, &wire.1).await?;
5236 match rx.await {
5237 Ok(result) => result.map(|_| ()),
5238 Err(_) => Err(InvocationError::Deserialize(
5239 "rpc_write channel closed".into(),
5240 )),
5241 }
5242 }
5243
5244 async fn init_connection(&self) -> Result<(), InvocationError> {
5247 use tl::functions::{InitConnection, InvokeWithLayer, help::GetConfig};
5248 let req = InvokeWithLayer {
5249 layer: tl::LAYER,
5250 query: InitConnection {
5251 api_id: self.inner.api_id,
5252 device_model: "Linux".to_string(),
5253 system_version: "1.0".to_string(),
5254 app_version: env!("CARGO_PKG_VERSION").to_string(),
5255 system_lang_code: "en".to_string(),
5256 lang_pack: "".to_string(),
5257 lang_code: "en".to_string(),
5258 proxy: None,
5259 params: None,
5260 query: GetConfig {},
5261 },
5262 };
5263
5264 let body = self.rpc_call_raw_serializable(&req).await?;
5266
5267 let mut cur = Cursor::from_slice(&body);
5268 if let Ok(tl::enums::Config::Config(cfg)) = tl::enums::Config::deserialize(&mut cur) {
5269 let allow_ipv6 = self.inner.allow_ipv6;
5270 let mut opts = self.inner.dc_options.lock().await;
5271 let mut media_opts = self.inner.media_dc_options.lock().await;
5272 for opt in &cfg.dc_options {
5273 let tl::enums::DcOption::DcOption(o) = opt;
5274 if o.ipv6 && !allow_ipv6 {
5275 continue;
5276 }
5277 let addr = format!("{}:{}", o.ip_address, o.port);
5278 let mut flags = DcFlags::NONE;
5279 if o.ipv6 {
5280 flags.set(DcFlags::IPV6);
5281 }
5282 if o.media_only {
5283 flags.set(DcFlags::MEDIA_ONLY);
5284 }
5285 if o.tcpo_only {
5286 flags.set(DcFlags::TCPO_ONLY);
5287 }
5288 if o.cdn {
5289 flags.set(DcFlags::CDN);
5290 }
5291 if o.r#static {
5292 flags.set(DcFlags::STATIC);
5293 }
5294
5295 if o.media_only || o.cdn {
5296 let e = media_opts.entry(o.id).or_insert_with(|| DcEntry {
5297 dc_id: o.id,
5298 addr: addr.clone(),
5299 auth_key: None,
5300 first_salt: 0,
5301 time_offset: 0,
5302 flags,
5303 });
5304 e.addr = addr;
5305 e.flags = flags;
5306 } else if !o.tcpo_only {
5307 let e = opts.entry(o.id).or_insert_with(|| DcEntry {
5308 dc_id: o.id,
5309 addr: addr.clone(),
5310 auth_key: None,
5311 first_salt: 0,
5312 time_offset: 0,
5313 flags,
5314 });
5315 e.addr = addr;
5316 e.flags = flags;
5317 }
5318 }
5319 tracing::info!(
5320 "[layer] initConnection ✓ ({} DCs, ipv6={})",
5321 cfg.dc_options.len(),
5322 allow_ipv6
5323 );
5324 }
5325 Ok(())
5326 }
5327
5328 async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
5331 let addr = {
5332 let opts = self.inner.dc_options.lock().await;
5333 opts.get(&new_dc_id)
5334 .map(|e| e.addr.clone())
5335 .unwrap_or_else(|| crate::dc_migration::fallback_dc_addr(new_dc_id).to_string())
5336 };
5337 tracing::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
5338
5339 let saved_key = {
5340 let opts = self.inner.dc_options.lock().await;
5341 opts.get(&new_dc_id).and_then(|e| e.auth_key)
5342 };
5343
5344 let socks5 = self.inner.socks5.clone();
5345 let mtproxy = self.inner.mtproxy.clone();
5346 let transport = self.inner.transport.clone();
5347 let conn = if let Some(key) = saved_key {
5348 Connection::connect_with_key(
5349 &addr,
5350 key,
5351 0,
5352 0,
5353 socks5.as_ref(),
5354 &transport,
5355 new_dc_id as i16,
5356 )
5357 .await?
5358 } else {
5359 Connection::connect_raw(
5360 &addr,
5361 socks5.as_ref(),
5362 mtproxy.as_ref(),
5363 &transport,
5364 new_dc_id as i16,
5365 )
5366 .await?
5367 };
5368
5369 let new_key = conn.auth_key_bytes();
5370 {
5371 let mut opts = self.inner.dc_options.lock().await;
5372 let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
5373 dc_id: new_dc_id,
5374 addr: addr.clone(),
5375 auth_key: None,
5376 first_salt: 0,
5377 time_offset: 0,
5378 flags: DcFlags::NONE,
5379 });
5380 entry.auth_key = Some(new_key);
5381 }
5382
5383 let (new_writer, new_wh, new_read, new_fk) = conn.into_writer();
5385 let new_ak = new_writer.enc.auth_key_bytes();
5386 let new_sid = new_writer.enc.session_id();
5387 *self.inner.writer.lock().await = new_writer;
5388 *self.inner.write_half.lock().await = new_wh;
5389 *self.inner.home_dc_id.lock().await = new_dc_id;
5390
5391 let _ = self
5394 .inner
5395 .reconnect_tx
5396 .send((new_read, new_fk, new_ak, new_sid));
5397
5398 loop {
5408 match self.init_connection().await {
5409 Ok(()) => break,
5410 Err(InvocationError::Rpc(ref r)) if r.flood_wait_seconds().is_some() => {
5411 let secs = r.flood_wait_seconds().unwrap();
5412 tracing::warn!(
5413 "[layer] migrate_to DC{new_dc_id}: init FLOOD_WAIT_{secs}: waiting"
5414 );
5415 sleep(Duration::from_secs(secs + 1)).await;
5416 }
5417 Err(e) => return Err(e),
5418 }
5419 }
5420
5421 self.save_session().await.ok();
5422 tracing::info!("[layer] Now on DC{new_dc_id} ✓");
5423 Ok(())
5424 }
5425
5426 pub fn disconnect(&self) {
5436 self.inner.shutdown_token.cancel();
5437 }
5438
5439 pub async fn sync_update_state(&self) {
5447 let _ = self.sync_pts_state().await;
5448 }
5449
5450 async fn cache_user(&self, user: &tl::enums::User) {
5453 self.inner.peer_cache.write().await.cache_user(user);
5454 }
5455
5456 async fn cache_users_slice(&self, users: &[tl::enums::User]) {
5457 let mut cache = self.inner.peer_cache.write().await;
5458 cache.cache_users(users);
5459 }
5460
5461 async fn cache_chats_slice(&self, chats: &[tl::enums::Chat]) {
5462 let mut cache = self.inner.peer_cache.write().await;
5463 cache.cache_chats(chats);
5464 }
5465
5466 async fn cache_users_and_chats(&self, users: &[tl::enums::User], chats: &[tl::enums::Chat]) {
5468 let mut cache = self.inner.peer_cache.write().await;
5469 cache.cache_users(users);
5470 cache.cache_chats(chats);
5471 }
5472
5473 #[doc(hidden)]
5475 pub async fn cache_users_slice_pub(&self, users: &[tl::enums::User]) {
5476 self.cache_users_slice(users).await;
5477 }
5478
5479 #[doc(hidden)]
5480 pub async fn cache_chats_slice_pub(&self, chats: &[tl::enums::Chat]) {
5481 self.cache_chats_slice(chats).await;
5482 }
5483
5484 #[doc(hidden)]
5486 pub async fn rpc_call_raw_pub<R: layer_tl_types::RemoteCall>(
5487 &self,
5488 req: &R,
5489 ) -> Result<Vec<u8>, InvocationError> {
5490 self.rpc_call_raw(req).await
5491 }
5492
5493 async fn rpc_call_raw_serializable<S: tl::Serializable>(
5495 &self,
5496 req: &S,
5497 ) -> Result<Vec<u8>, InvocationError> {
5498 let mut fail_count = NonZeroU32::new(1).unwrap();
5499 let mut slept_so_far = Duration::default();
5500 loop {
5501 match self.do_rpc_write_returning_body(req).await {
5502 Ok(body) => return Ok(body),
5503 Err(e) => {
5504 let ctx = RetryContext {
5505 fail_count,
5506 slept_so_far,
5507 error: e,
5508 };
5509 match self.inner.retry_policy.should_retry(&ctx) {
5510 ControlFlow::Continue(delay) => {
5511 sleep(delay).await;
5512 slept_so_far += delay;
5513 fail_count = fail_count.saturating_add(1);
5514 }
5515 ControlFlow::Break(()) => return Err(ctx.error),
5516 }
5517 }
5518 }
5519 }
5520 }
5521
5522 async fn do_rpc_write_returning_body<S: tl::Serializable>(
5523 &self,
5524 req: &S,
5525 ) -> Result<Vec<u8>, InvocationError> {
5526 let (tx, rx) = oneshot::channel();
5527 let wire = {
5528 let raw_body = req.to_bytes();
5529 let body = maybe_gz_pack(&raw_body); let mut w = self.inner.writer.lock().await;
5531 let fk = w.frame_kind.clone();
5532 let acks: Vec<i64> = w.pending_ack.drain(..).collect(); if acks.is_empty() {
5534 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
5535 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
5537 (wire, fk)
5538 } else {
5539 let ack_body = build_msgs_ack_body(&acks);
5540 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
5541 let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);
5542 let container_payload = build_container_body(&[
5543 (ack_msg_id, ack_seqno, ack_body.as_slice()),
5544 (req_msg_id, req_seqno, body.as_slice()),
5545 ]);
5546 let (wire, container_msg_id) = w.enc.pack_container(&container_payload);
5547 w.sent_bodies.insert(req_msg_id, body); w.container_map.insert(container_msg_id, req_msg_id); self.inner.pending.lock().await.insert(req_msg_id, tx);
5550 (wire, fk)
5551 }
5552 };
5554 send_frame_write(&mut *self.inner.write_half.lock().await, &wire.0, &wire.1).await?;
5555 match rx.await {
5556 Ok(result) => result,
5557 Err(_) => Err(InvocationError::Deserialize("rpc channel closed".into())),
5558 }
5559 }
5560
5561 pub async fn count_channels(&self) -> Result<usize, InvocationError> {
5564 let mut iter = self.iter_dialogs();
5565 let mut count = 0usize;
5566 while let Some(dialog) = iter.next(self).await? {
5567 if matches!(dialog.peer(), Some(tl::enums::Peer::Channel(_))) {
5568 count += 1;
5569 }
5570 }
5571 Ok(count)
5572 }
5573
5574 pub fn iter_dialogs(&self) -> DialogIter {
5588 DialogIter {
5589 offset_date: 0,
5590 offset_id: 0,
5591 offset_peer: tl::enums::InputPeer::Empty,
5592 done: false,
5593 buffer: VecDeque::new(),
5594 total: None,
5595 }
5596 }
5597
5598 pub fn iter_messages(&self, peer: impl Into<PeerRef>) -> MessageIter {
5612 MessageIter {
5613 unresolved: Some(peer.into()),
5614 peer: None,
5615 offset_id: 0,
5616 done: false,
5617 buffer: VecDeque::new(),
5618 total: None,
5619 }
5620 }
5621
5622 pub async fn resolve_to_input_peer(
5627 &self,
5628 peer: &tl::enums::Peer,
5629 ) -> Result<tl::enums::InputPeer, InvocationError> {
5630 let cache = self.inner.peer_cache.read().await;
5631 match peer {
5632 tl::enums::Peer::User(u) => {
5633 if u.user_id == 0 {
5634 return Ok(tl::enums::InputPeer::PeerSelf);
5635 }
5636 match cache.users.get(&u.user_id) {
5637 Some(&hash) => Ok(tl::enums::InputPeer::User(tl::types::InputPeerUser {
5638 user_id: u.user_id,
5639 access_hash: hash,
5640 })),
5641 None => Err(InvocationError::Deserialize(format!(
5642 "access_hash unknown for user {}; resolve via username first",
5643 u.user_id
5644 ))),
5645 }
5646 }
5647 tl::enums::Peer::Chat(c) => Ok(tl::enums::InputPeer::Chat(tl::types::InputPeerChat {
5648 chat_id: c.chat_id,
5649 })),
5650 tl::enums::Peer::Channel(c) => match cache.channels.get(&c.channel_id) {
5651 Some(&hash) => Ok(tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
5652 channel_id: c.channel_id,
5653 access_hash: hash,
5654 })),
5655 None => Err(InvocationError::Deserialize(format!(
5656 "access_hash unknown for channel {}; resolve via username first",
5657 c.channel_id
5658 ))),
5659 },
5660 }
5661 }
5662
5663 pub async fn invoke_on_dc<R: RemoteCall>(
5671 &self,
5672 dc_id: i32,
5673 req: &R,
5674 ) -> Result<R::Return, InvocationError> {
5675 let body = self.rpc_on_dc_raw(dc_id, req).await?;
5676 let mut cur = Cursor::from_slice(&body);
5677 R::Return::deserialize(&mut cur).map_err(Into::into)
5678 }
5679
5680 async fn rpc_on_dc_raw<R: RemoteCall>(
5682 &self,
5683 dc_id: i32,
5684 req: &R,
5685 ) -> Result<Vec<u8>, InvocationError> {
5686 let needs_new = {
5688 let pool = self.inner.dc_pool.lock().await;
5689 !pool.has_connection(dc_id)
5690 };
5691
5692 if needs_new {
5693 let addr = {
5694 let opts = self.inner.dc_options.lock().await;
5695 opts.get(&dc_id)
5696 .map(|e| e.addr.clone())
5697 .unwrap_or_else(|| crate::dc_migration::fallback_dc_addr(dc_id).to_string())
5698 };
5699
5700 let socks5 = self.inner.socks5.clone();
5701 let transport = self.inner.transport.clone();
5702 let saved_key = {
5703 let opts = self.inner.dc_options.lock().await;
5704 opts.get(&dc_id).and_then(|e| e.auth_key)
5705 };
5706
5707 let dc_conn = if let Some(key) = saved_key {
5708 dc_pool::DcConnection::connect_with_key(
5709 &addr,
5710 key,
5711 0,
5712 0,
5713 socks5.as_ref(),
5714 &transport,
5715 dc_id as i16,
5716 )
5717 .await?
5718 } else {
5719 let conn = dc_pool::DcConnection::connect_raw(
5720 &addr,
5721 socks5.as_ref(),
5722 &transport,
5723 dc_id as i16,
5724 )
5725 .await?;
5726 let home_dc_id = *self.inner.home_dc_id.lock().await;
5728 if dc_id != home_dc_id
5729 && let Err(e) = self.export_import_auth(dc_id, &conn).await
5730 {
5731 tracing::warn!("[layer] Auth export/import for DC{dc_id} failed: {e}");
5732 }
5733 conn
5734 };
5735
5736 let key = dc_conn.auth_key_bytes();
5737 {
5738 let mut opts = self.inner.dc_options.lock().await;
5739 if let Some(e) = opts.get_mut(&dc_id) {
5740 e.auth_key = Some(key);
5741 }
5742 }
5743 self.inner.dc_pool.lock().await.insert(dc_id, dc_conn);
5744 }
5745
5746 let dc_entries: Vec<DcEntry> = self
5747 .inner
5748 .dc_options
5749 .lock()
5750 .await
5751 .values()
5752 .cloned()
5753 .collect();
5754 self.inner
5755 .dc_pool
5756 .lock()
5757 .await
5758 .invoke_on_dc(dc_id, &dc_entries, req)
5759 .await
5760 }
5761
5762 async fn export_import_auth(
5764 &self,
5765 dc_id: i32,
5766 _dc_conn: &dc_pool::DcConnection, ) -> Result<(), InvocationError> {
5768 let export_req = tl::functions::auth::ExportAuthorization { dc_id };
5770 let body = self.rpc_call_raw(&export_req).await?;
5771 let mut cur = Cursor::from_slice(&body);
5772 let tl::enums::auth::ExportedAuthorization::ExportedAuthorization(exported) =
5773 tl::enums::auth::ExportedAuthorization::deserialize(&mut cur)?;
5774
5775 let import_req = tl::functions::auth::ImportAuthorization {
5777 id: exported.id,
5778 bytes: exported.bytes,
5779 };
5780 let dc_entries: Vec<DcEntry> = self
5781 .inner
5782 .dc_options
5783 .lock()
5784 .await
5785 .values()
5786 .cloned()
5787 .collect();
5788 self.inner
5789 .dc_pool
5790 .lock()
5791 .await
5792 .invoke_on_dc(dc_id, &dc_entries, &import_req)
5793 .await?;
5794 tracing::debug!("[layer] Auth exported+imported to DC{dc_id} ✓");
5795 Ok(())
5796 }
5797
5798 async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
5801 let body = self
5802 .rpc_call_raw(&tl::functions::account::GetPassword {})
5803 .await?;
5804 let mut cur = Cursor::from_slice(&body);
5805 let tl::enums::account::Password::Password(pw) =
5806 tl::enums::account::Password::deserialize(&mut cur)?;
5807 Ok(PasswordToken { password: pw })
5808 }
5809
5810 fn make_send_code_req(&self, phone: &str) -> tl::functions::auth::SendCode {
5811 tl::functions::auth::SendCode {
5812 phone_number: phone.to_string(),
5813 api_id: self.inner.api_id,
5814 api_hash: self.inner.api_hash.clone(),
5815 settings: tl::enums::CodeSettings::CodeSettings(tl::types::CodeSettings {
5816 allow_flashcall: false,
5817 current_number: false,
5818 allow_app_hash: false,
5819 allow_missed_call: false,
5820 allow_firebase: false,
5821 unknown_number: false,
5822 logout_tokens: None,
5823 token: None,
5824 app_sandbox: None,
5825 }),
5826 }
5827 }
5828
5829 fn extract_user_name(user: &tl::enums::User) -> String {
5830 match user {
5831 tl::enums::User::User(u) => format!(
5832 "{} {}",
5833 u.first_name.as_deref().unwrap_or(""),
5834 u.last_name.as_deref().unwrap_or("")
5835 )
5836 .trim()
5837 .to_string(),
5838 tl::enums::User::Empty(_) => "(unknown)".into(),
5839 }
5840 }
5841
5842 #[allow(clippy::type_complexity)]
5843 fn extract_password_params(
5844 algo: &tl::enums::PasswordKdfAlgo,
5845 ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
5846 match algo {
5847 tl::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
5848 Ok((&a.salt1, &a.salt2, &a.p, a.g))
5849 }
5850 _ => Err(InvocationError::Deserialize(
5851 "unsupported password KDF algo".into(),
5852 )),
5853 }
5854 }
5855}
5856
5857pub(crate) fn attach_client_to_update(u: update::Update, client: &Client) -> update::Update {
5860 match u {
5861 update::Update::NewMessage(msg) => {
5862 update::Update::NewMessage(msg.with_client(client.clone()))
5863 }
5864 update::Update::MessageEdited(msg) => {
5865 update::Update::MessageEdited(msg.with_client(client.clone()))
5866 }
5867 other => other,
5868 }
5869}
5870
5871pub struct DialogIter {
5875 offset_date: i32,
5876 offset_id: i32,
5877 offset_peer: tl::enums::InputPeer,
5878 done: bool,
5879 buffer: VecDeque<Dialog>,
5880 pub total: Option<i32>,
5883}
5884
5885impl DialogIter {
5886 const PAGE_SIZE: i32 = 100;
5887
5888 pub fn total(&self) -> Option<i32> {
5894 self.total
5895 }
5896
5897 pub async fn next(&mut self, client: &Client) -> Result<Option<Dialog>, InvocationError> {
5899 if let Some(d) = self.buffer.pop_front() {
5900 return Ok(Some(d));
5901 }
5902 if self.done {
5903 return Ok(None);
5904 }
5905
5906 let req = tl::functions::messages::GetDialogs {
5907 exclude_pinned: false,
5908 folder_id: None,
5909 offset_date: self.offset_date,
5910 offset_id: self.offset_id,
5911 offset_peer: self.offset_peer.clone(),
5912 limit: Self::PAGE_SIZE,
5913 hash: 0,
5914 };
5915
5916 let (dialogs, count) = client.get_dialogs_raw_with_count(req).await?;
5917 if self.total.is_none() {
5919 self.total = count;
5920 }
5921 if dialogs.is_empty() || dialogs.len() < Self::PAGE_SIZE as usize {
5922 self.done = true;
5923 }
5924
5925 if let Some(last) = dialogs.last() {
5927 self.offset_date = last
5928 .message
5929 .as_ref()
5930 .map(|m| match m {
5931 tl::enums::Message::Message(x) => x.date,
5932 tl::enums::Message::Service(x) => x.date,
5933 _ => 0,
5934 })
5935 .unwrap_or(0);
5936 self.offset_id = last.top_message();
5937 if let Some(peer) = last.peer() {
5938 self.offset_peer = client.inner.peer_cache.read().await.peer_to_input(peer);
5939 }
5940 }
5941
5942 self.buffer.extend(dialogs);
5943 Ok(self.buffer.pop_front())
5944 }
5945}
5946
5947pub struct MessageIter {
5949 unresolved: Option<PeerRef>,
5950 peer: Option<tl::enums::Peer>,
5951 offset_id: i32,
5952 done: bool,
5953 buffer: VecDeque<update::IncomingMessage>,
5954 pub total: Option<i32>,
5958}
5959
5960impl MessageIter {
5961 const PAGE_SIZE: i32 = 100;
5962
5963 pub fn total(&self) -> Option<i32> {
5968 self.total
5969 }
5970
5971 pub async fn next(
5973 &mut self,
5974 client: &Client,
5975 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
5976 if let Some(m) = self.buffer.pop_front() {
5977 return Ok(Some(m));
5978 }
5979 if self.done {
5980 return Ok(None);
5981 }
5982
5983 let peer = if let Some(p) = &self.peer {
5985 p.clone()
5986 } else {
5987 let pr = self.unresolved.take().expect("MessageIter: peer not set");
5988 let p = pr.resolve(client).await?;
5989 self.peer = Some(p.clone());
5990 p
5991 };
5992
5993 let input_peer = client.inner.peer_cache.read().await.peer_to_input(&peer);
5994 let (page, count) = client
5995 .get_messages_with_count(input_peer, Self::PAGE_SIZE, self.offset_id)
5996 .await?;
5997
5998 if self.total.is_none() {
5999 self.total = count;
6000 }
6001
6002 if page.is_empty() || page.len() < Self::PAGE_SIZE as usize {
6003 self.done = true;
6004 }
6005 if let Some(last) = page.last() {
6006 self.offset_id = last.id();
6007 }
6008
6009 self.buffer.extend(page);
6010 Ok(self.buffer.pop_front())
6011 }
6012}
6013
6014#[doc(hidden)]
6018pub fn random_i64_pub() -> i64 {
6019 random_i64()
6020}
6021
6022pub fn is_bool_true(body: &[u8]) -> bool {
6023 body.len() == 4 && u32::from_le_bytes(body[0..4].try_into().unwrap_or([0u8; 4])) == 0x997275b5
6024}
6025
6026pub fn is_bool_false(body: &[u8]) -> bool {
6027 body.len() == 4 && u32::from_le_bytes(body[0..4].try_into().unwrap_or([0u8; 4])) == 0xbc799737
6028}
6029
6030#[derive(Clone)]
6040enum FrameKind {
6041 Abridged,
6042 Intermediate,
6043 #[allow(dead_code)]
6044 Full {
6045 send_seqno: Arc<std::sync::atomic::AtomicU32>,
6046 recv_seqno: Arc<std::sync::atomic::AtomicU32>,
6047 },
6048 Obfuscated {
6050 cipher: std::sync::Arc<tokio::sync::Mutex<layer_crypto::ObfuscatedCipher>>,
6051 },
6052 PaddedIntermediate {
6054 cipher: std::sync::Arc<tokio::sync::Mutex<layer_crypto::ObfuscatedCipher>>,
6055 },
6056 FakeTls {
6058 cipher: std::sync::Arc<tokio::sync::Mutex<layer_crypto::ObfuscatedCipher>>,
6059 },
6060}
6061
6062#[derive(Clone, Debug)]
6068struct FutureSalt {
6069 valid_since: i32,
6070 valid_until: i32,
6071 salt: i64,
6072}
6073
6074const SALT_USE_DELAY: i32 = 60;
6077
6078struct ConnectionWriter {
6080 enc: EncryptedSession,
6081 frame_kind: FrameKind,
6082 pending_ack: Vec<i64>,
6086 sent_bodies: std::collections::HashMap<i64, Vec<u8>>,
6090 container_map: std::collections::HashMap<i64, i64>,
6096 salts: Vec<FutureSalt>,
6101 start_salt_time: Option<(i32, std::time::Instant)>,
6106}
6107
6108impl ConnectionWriter {
6109 fn auth_key_bytes(&self) -> [u8; 256] {
6110 self.enc.auth_key_bytes()
6111 }
6112 fn first_salt(&self) -> i64 {
6113 self.enc.salt
6114 }
6115 fn time_offset(&self) -> i32 {
6116 self.enc.time_offset
6117 }
6118
6119 fn advance_salt_if_needed(&mut self) -> bool {
6133 let Some((server_now, start_instant)) = self.start_salt_time else {
6134 return self.salts.len() <= 1;
6135 };
6136
6137 let now = server_now + start_instant.elapsed().as_secs() as i32;
6139
6140 while self.salts.len() > 1 && now > self.salts[0].valid_until {
6142 let expired = self.salts.remove(0);
6143 tracing::debug!(
6144 "[layer] salt {:#x} expired (valid_until={}), pruned",
6145 expired.salt,
6146 expired.valid_until,
6147 );
6148 }
6149
6150 if self.salts.len() > 1 {
6153 let best = self
6154 .salts
6155 .iter()
6156 .rev()
6157 .find(|s| s.valid_since + SALT_USE_DELAY <= now)
6158 .map(|s| s.salt);
6159 if let Some(salt) = best {
6160 if salt != self.enc.salt {
6161 tracing::debug!(
6162 "[layer] proactive salt cycle: {:#x} → {:#x}",
6163 self.enc.salt,
6164 salt
6165 );
6166 self.enc.salt = salt;
6167 self.salts.retain(|s| s.valid_since >= now - SALT_USE_DELAY);
6169 if self.salts.is_empty() {
6170 self.salts.push(FutureSalt {
6172 valid_since: 0,
6173 valid_until: i32::MAX,
6174 salt,
6175 });
6176 }
6177 }
6178 }
6179 }
6180
6181 self.salts.len() <= 1
6182 }
6183}
6184
6185struct Connection {
6186 stream: TcpStream,
6187 enc: EncryptedSession,
6188 frame_kind: FrameKind,
6189}
6190
6191impl Connection {
6192 async fn open_stream(
6194 addr: &str,
6195 socks5: Option<&crate::socks5::Socks5Config>,
6196 transport: &TransportKind,
6197 dc_id: i16,
6198 ) -> Result<(TcpStream, FrameKind), InvocationError> {
6199 let stream = match socks5 {
6200 Some(proxy) => proxy.connect(addr).await?,
6201 None => {
6202 let stream = TcpStream::connect(addr)
6203 .await
6204 .map_err(InvocationError::Io)?;
6205 stream.set_nodelay(true).ok();
6206 {
6207 let sock = socket2::SockRef::from(&stream);
6208 let keepalive = TcpKeepalive::new()
6209 .with_time(Duration::from_secs(TCP_KEEPALIVE_IDLE_SECS))
6210 .with_interval(Duration::from_secs(TCP_KEEPALIVE_INTERVAL_SECS));
6211 #[cfg(not(target_os = "windows"))]
6212 let keepalive = keepalive.with_retries(TCP_KEEPALIVE_PROBES);
6213 sock.set_tcp_keepalive(&keepalive).ok();
6214 }
6215 stream
6216 }
6217 };
6218 Self::apply_transport_init(stream, transport, dc_id).await
6219 }
6220
6221 async fn open_stream_mtproxy(
6224 mtproxy: &crate::proxy::MtProxyConfig,
6225 dc_id: i16,
6226 ) -> Result<(TcpStream, FrameKind), InvocationError> {
6227 let stream = mtproxy.connect().await?;
6228 stream.set_nodelay(true).ok();
6229 Self::apply_transport_init(stream, &mtproxy.transport, dc_id).await
6230 }
6231
6232 async fn apply_transport_init(
6233 mut stream: TcpStream,
6234 transport: &TransportKind,
6235 dc_id: i16,
6236 ) -> Result<(TcpStream, FrameKind), InvocationError> {
6237 match transport {
6238 TransportKind::Abridged => {
6239 stream.write_all(&[0xef]).await?;
6240 Ok((stream, FrameKind::Abridged))
6241 }
6242 TransportKind::Intermediate => {
6243 stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
6244 Ok((stream, FrameKind::Intermediate))
6245 }
6246 TransportKind::Full => {
6247 Ok((
6249 stream,
6250 FrameKind::Full {
6251 send_seqno: Arc::new(std::sync::atomic::AtomicU32::new(0)),
6252 recv_seqno: Arc::new(std::sync::atomic::AtomicU32::new(0)),
6253 },
6254 ))
6255 }
6256 TransportKind::Obfuscated { secret } => {
6257 use sha2::Digest;
6258
6259 let mut nonce = [0u8; 64];
6264 loop {
6265 getrandom::getrandom(&mut nonce)
6266 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
6267 let first = u32::from_le_bytes(nonce[0..4].try_into().unwrap());
6268 let second = u32::from_le_bytes(nonce[4..8].try_into().unwrap());
6269 let bad = nonce[0] == 0xEF
6270 || first == 0x44414548 || first == 0x54534F50 || first == 0x20544547 || first == 0xEEEEEEEE
6274 || first == 0xDDDDDDDD
6275 || first == 0x02010316
6276 || second == 0x00000000;
6277 if !bad {
6278 break;
6279 }
6280 }
6281
6282 let tx_raw: [u8; 32] = nonce[8..40].try_into().unwrap();
6288 let tx_iv: [u8; 16] = nonce[40..56].try_into().unwrap();
6289 let mut rev48 = nonce[8..56].to_vec();
6290 rev48.reverse();
6291 let rx_raw: [u8; 32] = rev48[0..32].try_into().unwrap();
6292 let rx_iv: [u8; 16] = rev48[32..48].try_into().unwrap();
6293
6294 let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
6295 let mut h = sha2::Sha256::new();
6296 h.update(tx_raw);
6297 h.update(s.as_ref());
6298 let tx: [u8; 32] = h.finalize().into();
6299
6300 let mut h = sha2::Sha256::new();
6301 h.update(rx_raw);
6302 h.update(s.as_ref());
6303 let rx: [u8; 32] = h.finalize().into();
6304 (tx, rx)
6305 } else {
6306 (tx_raw, rx_raw)
6307 };
6308
6309 nonce[56] = 0xef;
6312 nonce[57] = 0xef;
6313 nonce[58] = 0xef;
6314 nonce[59] = 0xef;
6315 let dc_bytes = dc_id.to_le_bytes();
6316 nonce[60] = dc_bytes[0];
6317 nonce[61] = dc_bytes[1];
6318
6319 let mut cipher =
6329 layer_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
6330 let mut skip = [0u8; 56];
6332 cipher.encrypt(&mut skip);
6333 cipher.encrypt(&mut nonce[56..64]);
6335
6336 stream.write_all(&nonce).await?;
6337
6338 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
6339 Ok((stream, FrameKind::Obfuscated { cipher: cipher_arc }))
6340 }
6341 TransportKind::PaddedIntermediate { secret } => {
6342 use sha2::Digest;
6343 let mut nonce = [0u8; 64];
6344 loop {
6345 getrandom::getrandom(&mut nonce)
6346 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
6347 let first = u32::from_le_bytes(nonce[0..4].try_into().unwrap());
6348 let second = u32::from_le_bytes(nonce[4..8].try_into().unwrap());
6349 let bad = nonce[0] == 0xEF
6350 || first == 0x44414548
6351 || first == 0x54534F50
6352 || first == 0x20544547
6353 || first == 0xEEEEEEEE
6354 || first == 0xDDDDDDDD
6355 || first == 0x02010316
6356 || second == 0x00000000;
6357 if !bad {
6358 break;
6359 }
6360 }
6361 let tx_raw: [u8; 32] = nonce[8..40].try_into().unwrap();
6362 let tx_iv: [u8; 16] = nonce[40..56].try_into().unwrap();
6363 let mut rev48 = nonce[8..56].to_vec();
6364 rev48.reverse();
6365 let rx_raw: [u8; 32] = rev48[0..32].try_into().unwrap();
6366 let rx_iv: [u8; 16] = rev48[32..48].try_into().unwrap();
6367 let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
6368 let mut h = sha2::Sha256::new();
6369 h.update(tx_raw);
6370 h.update(s.as_ref());
6371 let tx: [u8; 32] = h.finalize().into();
6372 let mut h = sha2::Sha256::new();
6373 h.update(rx_raw);
6374 h.update(s.as_ref());
6375 let rx: [u8; 32] = h.finalize().into();
6376 (tx, rx)
6377 } else {
6378 (tx_raw, rx_raw)
6379 };
6380 nonce[56] = 0xdd;
6382 nonce[57] = 0xdd;
6383 nonce[58] = 0xdd;
6384 nonce[59] = 0xdd;
6385 let dc_bytes = dc_id.to_le_bytes();
6386 nonce[60] = dc_bytes[0];
6387 nonce[61] = dc_bytes[1];
6388 let mut cipher =
6389 layer_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
6390 let mut skip = [0u8; 56];
6391 cipher.encrypt(&mut skip);
6392 cipher.encrypt(&mut nonce[56..64]);
6393 stream.write_all(&nonce).await?;
6394 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
6395 Ok((stream, FrameKind::PaddedIntermediate { cipher: cipher_arc }))
6396 }
6397 TransportKind::FakeTls { secret, domain } => {
6398 let domain_bytes = domain.as_bytes();
6402 let mut session_id = [0u8; 32];
6403 getrandom::getrandom(&mut session_id)
6404 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
6405
6406 let cipher_suites: &[u8] = &[0x00, 0x04, 0x13, 0x01, 0x13, 0x02];
6408 let compression: &[u8] = &[0x01, 0x00];
6409 let sni_name_len = domain_bytes.len() as u16;
6410 let sni_list_len = sni_name_len + 3;
6411 let sni_ext_len = sni_list_len + 2;
6412 let mut sni_ext = Vec::new();
6413 sni_ext.extend_from_slice(&[0x00, 0x00]);
6414 sni_ext.extend_from_slice(&sni_ext_len.to_be_bytes());
6415 sni_ext.extend_from_slice(&sni_list_len.to_be_bytes());
6416 sni_ext.push(0x00);
6417 sni_ext.extend_from_slice(&sni_name_len.to_be_bytes());
6418 sni_ext.extend_from_slice(domain_bytes);
6419 let sup_ver: &[u8] = &[0x00, 0x2b, 0x00, 0x03, 0x02, 0x03, 0x04];
6420 let sup_grp: &[u8] = &[0x00, 0x0a, 0x00, 0x04, 0x00, 0x02, 0x00, 0x1d];
6421 let sess_tick: &[u8] = &[0x00, 0x23, 0x00, 0x00];
6422 let ext_body_len = sni_ext.len() + sup_ver.len() + sup_grp.len() + sess_tick.len();
6423 let mut extensions = Vec::new();
6424 extensions.extend_from_slice(&(ext_body_len as u16).to_be_bytes());
6425 extensions.extend_from_slice(&sni_ext);
6426 extensions.extend_from_slice(sup_ver);
6427 extensions.extend_from_slice(sup_grp);
6428 extensions.extend_from_slice(sess_tick);
6429
6430 let mut hello_body = Vec::new();
6431 hello_body.extend_from_slice(&[0x03, 0x03]);
6432 hello_body.extend_from_slice(&[0u8; 32]); hello_body.push(session_id.len() as u8);
6434 hello_body.extend_from_slice(&session_id);
6435 hello_body.extend_from_slice(cipher_suites);
6436 hello_body.extend_from_slice(compression);
6437 hello_body.extend_from_slice(&extensions);
6438
6439 let hs_len = hello_body.len() as u32;
6440 let mut handshake = Vec::new();
6441 handshake.push(0x01);
6442 handshake.push(((hs_len >> 16) & 0xff) as u8);
6443 handshake.push(((hs_len >> 8) & 0xff) as u8);
6444 handshake.push((hs_len & 0xff) as u8);
6445 handshake.extend_from_slice(&hello_body);
6446
6447 let rec_len = handshake.len() as u16;
6448 let mut record = Vec::new();
6449 record.push(0x16);
6450 record.extend_from_slice(&[0x03, 0x01]);
6451 record.extend_from_slice(&rec_len.to_be_bytes());
6452 record.extend_from_slice(&handshake);
6453
6454 use sha2::Digest;
6456 let random_offset = 5 + 4 + 2; let hmac_result: [u8; 32] = {
6458 use hmac::{Hmac, Mac};
6459 type HmacSha256 = Hmac<sha2::Sha256>;
6460 let mut mac = HmacSha256::new_from_slice(secret)
6461 .map_err(|_| InvocationError::Deserialize("HMAC key error".into()))?;
6462 mac.update(&record);
6463 mac.finalize().into_bytes().into()
6464 };
6465 record[random_offset..random_offset + 32].copy_from_slice(&hmac_result);
6466 stream.write_all(&record).await?;
6467
6468 let mut h = sha2::Sha256::new();
6470 h.update(secret.as_ref());
6471 h.update(&hmac_result);
6472 let derived: [u8; 32] = h.finalize().into();
6473 let iv = [0u8; 16];
6474 let cipher =
6475 layer_crypto::ObfuscatedCipher::from_keys(&derived, &iv, &derived, &iv);
6476 let cipher_arc = std::sync::Arc::new(tokio::sync::Mutex::new(cipher));
6477 Ok((stream, FrameKind::FakeTls { cipher: cipher_arc }))
6478 }
6479 }
6480 }
6481
6482 async fn connect_raw(
6483 addr: &str,
6484 socks5: Option<&crate::socks5::Socks5Config>,
6485 mtproxy: Option<&crate::proxy::MtProxyConfig>,
6486 transport: &TransportKind,
6487 dc_id: i16,
6488 ) -> Result<Self, InvocationError> {
6489 tracing::debug!("[layer] Connecting to {addr} (DH) …");
6490
6491 let addr2 = addr.to_string();
6492 let socks5_c = socks5.cloned();
6493 let mtproxy_c = mtproxy.cloned();
6494 let transport_c = transport.clone();
6495
6496 let fut = async move {
6497 let (mut stream, frame_kind) = if let Some(ref mp) = mtproxy_c {
6498 Self::open_stream_mtproxy(mp, dc_id).await?
6499 } else {
6500 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c, dc_id).await?
6501 };
6502
6503 let mut plain = Session::new();
6504
6505 let (req1, s1) =
6506 auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6507 send_frame(
6508 &mut stream,
6509 &plain.pack(&req1).to_plaintext_bytes(),
6510 &frame_kind,
6511 )
6512 .await?;
6513 let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
6514
6515 let (req2, s2) = auth::step2(s1, res_pq, dc_id as i32)
6516 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6517 send_frame(
6518 &mut stream,
6519 &plain.pack(&req2).to_plaintext_bytes(),
6520 &frame_kind,
6521 )
6522 .await?;
6523 let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
6524
6525 let (req3, s3) =
6526 auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6527 send_frame(
6528 &mut stream,
6529 &plain.pack(&req3).to_plaintext_bytes(),
6530 &frame_kind,
6531 )
6532 .await?;
6533 let ans: tl::enums::SetClientDhParamsAnswer =
6534 recv_frame_plain(&mut stream, &frame_kind).await?;
6535
6536 let done = {
6538 let mut result = auth::finish(s3, ans)
6539 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6540 let mut attempts = 0u8;
6541 loop {
6542 match result {
6543 auth::FinishResult::Done(d) => break d,
6544 auth::FinishResult::Retry {
6545 retry_id,
6546 dh_params,
6547 nonce,
6548 server_nonce,
6549 new_nonce,
6550 } => {
6551 attempts += 1;
6552 if attempts >= 5 {
6553 return Err(InvocationError::Deserialize(
6554 "dh_gen_retry exceeded 5 attempts".into(),
6555 ));
6556 }
6557 let (req_retry, s3_retry) = auth::retry_step3(
6558 &dh_params,
6559 nonce,
6560 server_nonce,
6561 new_nonce,
6562 retry_id,
6563 )
6564 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6565 send_frame(
6566 &mut stream,
6567 &plain.pack(&req_retry).to_plaintext_bytes(),
6568 &frame_kind,
6569 )
6570 .await?;
6571 let ans_retry: tl::enums::SetClientDhParamsAnswer =
6572 recv_frame_plain(&mut stream, &frame_kind).await?;
6573 result = auth::finish(s3_retry, ans_retry)
6574 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
6575 }
6576 }
6577 }
6578 };
6579 tracing::debug!("[layer] DH complete ✓");
6580
6581 Ok::<Self, InvocationError>(Self {
6582 stream,
6583 enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
6584 frame_kind,
6585 })
6586 };
6587
6588 tokio::time::timeout(Duration::from_secs(15), fut)
6589 .await
6590 .map_err(|_| {
6591 InvocationError::Deserialize(format!(
6592 "DH handshake with {addr} timed out after 15 s"
6593 ))
6594 })?
6595 }
6596
6597 async fn connect_with_key(
6598 addr: &str,
6599 auth_key: [u8; 256],
6600 first_salt: i64,
6601 time_offset: i32,
6602 socks5: Option<&crate::socks5::Socks5Config>,
6603 transport: &TransportKind,
6604 dc_id: i16,
6605 ) -> Result<Self, InvocationError> {
6606 let addr2 = addr.to_string();
6607 let socks5_c = socks5.cloned();
6608 let transport_c = transport.clone();
6609
6610 let fut = async move {
6611 let (stream, frame_kind) =
6612 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c, dc_id).await?;
6613 Ok::<Self, InvocationError>(Self {
6614 stream,
6615 enc: EncryptedSession::new(auth_key, first_salt, time_offset),
6616 frame_kind,
6617 })
6618 };
6619
6620 tokio::time::timeout(Duration::from_secs(15), fut)
6621 .await
6622 .map_err(|_| {
6623 InvocationError::Deserialize(format!(
6624 "connect_with_key to {addr} timed out after 15 s"
6625 ))
6626 })?
6627 }
6628
6629 fn auth_key_bytes(&self) -> [u8; 256] {
6630 self.enc.auth_key_bytes()
6631 }
6632
6633 fn into_writer(self) -> (ConnectionWriter, OwnedWriteHalf, OwnedReadHalf, FrameKind) {
6635 let (read_half, write_half) = self.stream.into_split();
6636 let writer = ConnectionWriter {
6637 enc: self.enc,
6638 frame_kind: self.frame_kind.clone(),
6639 pending_ack: Vec::new(),
6640 sent_bodies: std::collections::HashMap::new(),
6641 container_map: std::collections::HashMap::new(),
6642 salts: Vec::new(),
6643 start_salt_time: None,
6644 };
6645 (writer, write_half, read_half, self.frame_kind)
6646 }
6647}
6648
6649async fn send_frame(
6653 stream: &mut TcpStream,
6654 data: &[u8],
6655 kind: &FrameKind,
6656) -> Result<(), InvocationError> {
6657 match kind {
6658 FrameKind::Abridged => send_abridged(stream, data).await,
6659 FrameKind::Intermediate => {
6660 let mut frame = Vec::with_capacity(4 + data.len());
6661 frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
6662 frame.extend_from_slice(data);
6663 stream.write_all(&frame).await?;
6664 Ok(())
6665 }
6666 FrameKind::Full { send_seqno, .. } => {
6667 let seq = send_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
6670 let total_len = (data.len() as u32) + 12;
6671 let mut packet = Vec::with_capacity(total_len as usize);
6672 packet.extend_from_slice(&total_len.to_le_bytes());
6673 packet.extend_from_slice(&seq.to_le_bytes());
6674 packet.extend_from_slice(data);
6675 let crc = crate::transport_intermediate::crc32_ieee(&packet);
6676 packet.extend_from_slice(&crc.to_le_bytes());
6677 stream.write_all(&packet).await?;
6678 Ok(())
6679 }
6680 FrameKind::Obfuscated { cipher } => {
6681 let words = data.len() / 4;
6683 let mut frame = if words < 0x7f {
6684 let mut v = Vec::with_capacity(1 + data.len());
6685 v.push(words as u8);
6686 v
6687 } else {
6688 let mut v = Vec::with_capacity(4 + data.len());
6689 v.extend_from_slice(&[
6690 0x7f,
6691 (words & 0xff) as u8,
6692 ((words >> 8) & 0xff) as u8,
6693 ((words >> 16) & 0xff) as u8,
6694 ]);
6695 v
6696 };
6697 frame.extend_from_slice(data);
6698 cipher.lock().await.encrypt(&mut frame);
6699 stream.write_all(&frame).await?;
6700 Ok(())
6701 }
6702 FrameKind::PaddedIntermediate { cipher } => {
6703 let mut pad_len_buf = [0u8; 1];
6705 getrandom::getrandom(&mut pad_len_buf).ok();
6706 let pad_len = (pad_len_buf[0] & 0x0f) as usize;
6707 let total_payload = data.len() + pad_len;
6708 let mut frame = Vec::with_capacity(4 + total_payload);
6709 frame.extend_from_slice(&(total_payload as u32).to_le_bytes());
6710 frame.extend_from_slice(data);
6711 let mut pad = vec![0u8; pad_len];
6712 getrandom::getrandom(&mut pad).ok();
6713 frame.extend_from_slice(&pad);
6714 cipher.lock().await.encrypt(&mut frame);
6715 stream.write_all(&frame).await?;
6716 Ok(())
6717 }
6718 FrameKind::FakeTls { cipher } => {
6719 const TLS_APP_DATA: u8 = 0x17;
6723 const TLS_VER: [u8; 2] = [0x03, 0x03];
6724 const CHUNK: usize = 2878;
6726 let mut locked = cipher.lock().await;
6727 for chunk in data.chunks(CHUNK) {
6728 let chunk_len = chunk.len() as u16;
6729 let mut record = Vec::with_capacity(5 + chunk.len());
6730 record.push(TLS_APP_DATA);
6731 record.extend_from_slice(&TLS_VER);
6732 record.extend_from_slice(&chunk_len.to_be_bytes());
6733 record.extend_from_slice(chunk);
6734 locked.encrypt(&mut record[5..]);
6736 stream.write_all(&record).await?;
6737 }
6738 Ok(())
6739 }
6740 }
6741}
6742
6743enum FrameOutcome {
6747 Frame(Vec<u8>),
6748 Error(InvocationError),
6749 Keepalive, }
6751
6752async fn recv_frame_with_keepalive(
6759 rh: &mut OwnedReadHalf,
6760 fk: &FrameKind,
6761 client: &Client,
6762 _ak: &[u8; 256],
6763) -> FrameOutcome {
6764 match tokio::time::timeout(
6765 Duration::from_secs(PING_DELAY_SECS),
6766 recv_frame_read(rh, fk),
6767 )
6768 .await
6769 {
6770 Ok(Ok(raw)) => FrameOutcome::Frame(raw),
6771 Ok(Err(e)) => FrameOutcome::Error(e),
6772 Err(_) => {
6773 let ping_req = tl::functions::PingDelayDisconnect {
6777 ping_id: random_i64(),
6778 disconnect_delay: NO_PING_DISCONNECT,
6779 };
6780 let (wire, fk) = {
6781 let mut w = client.inner.writer.lock().await;
6782 let fk = w.frame_kind.clone();
6783 (w.enc.pack(&ping_req), fk)
6784 };
6785 match send_frame_write(&mut *client.inner.write_half.lock().await, &wire, &fk).await {
6786 Ok(()) => FrameOutcome::Keepalive,
6787 Err(e) => FrameOutcome::Error(e),
6788 }
6789 }
6790 }
6791}
6792
6793async fn send_frame_write(
6800 stream: &mut OwnedWriteHalf,
6801 data: &[u8],
6802 kind: &FrameKind,
6803) -> Result<(), InvocationError> {
6804 match kind {
6805 FrameKind::Abridged => {
6806 let words = data.len() / 4;
6807 let mut frame = if words < 0x7f {
6809 let mut v = Vec::with_capacity(1 + data.len());
6810 v.push(words as u8);
6811 v
6812 } else {
6813 let mut v = Vec::with_capacity(4 + data.len());
6814 v.extend_from_slice(&[
6815 0x7f,
6816 (words & 0xff) as u8,
6817 ((words >> 8) & 0xff) as u8,
6818 ((words >> 16) & 0xff) as u8,
6819 ]);
6820 v
6821 };
6822 frame.extend_from_slice(data);
6823 stream.write_all(&frame).await?;
6824 Ok(())
6825 }
6826 FrameKind::Intermediate => {
6827 let mut frame = Vec::with_capacity(4 + data.len());
6828 frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
6829 frame.extend_from_slice(data);
6830 stream.write_all(&frame).await?;
6831 Ok(())
6832 }
6833 FrameKind::Full { send_seqno, .. } => {
6834 let seq = send_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
6836 let total_len = (data.len() as u32) + 12;
6837 let mut packet = Vec::with_capacity(total_len as usize);
6838 packet.extend_from_slice(&total_len.to_le_bytes());
6839 packet.extend_from_slice(&seq.to_le_bytes());
6840 packet.extend_from_slice(data);
6841 let crc = crate::transport_intermediate::crc32_ieee(&packet);
6842 packet.extend_from_slice(&crc.to_le_bytes());
6843 stream.write_all(&packet).await?;
6844 Ok(())
6845 }
6846 FrameKind::Obfuscated { cipher } => {
6847 let words = data.len() / 4;
6849 let mut frame = if words < 0x7f {
6850 let mut v = Vec::with_capacity(1 + data.len());
6851 v.push(words as u8);
6852 v
6853 } else {
6854 let mut v = Vec::with_capacity(4 + data.len());
6855 v.extend_from_slice(&[
6856 0x7f,
6857 (words & 0xff) as u8,
6858 ((words >> 8) & 0xff) as u8,
6859 ((words >> 16) & 0xff) as u8,
6860 ]);
6861 v
6862 };
6863 frame.extend_from_slice(data);
6864 cipher.lock().await.encrypt(&mut frame);
6865 stream.write_all(&frame).await?;
6866 Ok(())
6867 }
6868 FrameKind::PaddedIntermediate { cipher } => {
6869 let mut pad_len_buf = [0u8; 1];
6870 getrandom::getrandom(&mut pad_len_buf).ok();
6871 let pad_len = (pad_len_buf[0] & 0x0f) as usize;
6872 let total_payload = data.len() + pad_len;
6873 let mut frame = Vec::with_capacity(4 + total_payload);
6874 frame.extend_from_slice(&(total_payload as u32).to_le_bytes());
6875 frame.extend_from_slice(data);
6876 let mut pad = vec![0u8; pad_len];
6877 getrandom::getrandom(&mut pad).ok();
6878 frame.extend_from_slice(&pad);
6879 cipher.lock().await.encrypt(&mut frame);
6880 stream.write_all(&frame).await?;
6881 Ok(())
6882 }
6883 FrameKind::FakeTls { cipher } => {
6884 const TLS_APP_DATA: u8 = 0x17;
6885 const TLS_VER: [u8; 2] = [0x03, 0x03];
6886 const CHUNK: usize = 2878;
6887 let mut locked = cipher.lock().await;
6888 for chunk in data.chunks(CHUNK) {
6889 let chunk_len = chunk.len() as u16;
6890 let mut record = Vec::with_capacity(5 + chunk.len());
6891 record.push(TLS_APP_DATA);
6892 record.extend_from_slice(&TLS_VER);
6893 record.extend_from_slice(&chunk_len.to_be_bytes());
6894 record.extend_from_slice(chunk);
6895 locked.encrypt(&mut record[5..]);
6896 stream.write_all(&record).await?;
6897 }
6898 Ok(())
6899 }
6900 }
6901}
6902
6903async fn recv_frame_read(
6905 stream: &mut OwnedReadHalf,
6906 kind: &FrameKind,
6907) -> Result<Vec<u8>, InvocationError> {
6908 match kind {
6909 FrameKind::Abridged => {
6910 let mut h = [0u8; 1];
6911 stream.read_exact(&mut h).await?;
6912 let words = if h[0] < 0x7f {
6913 h[0] as usize
6914 } else {
6915 let mut b = [0u8; 3];
6916 stream.read_exact(&mut b).await?;
6917 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
6918 };
6919 let len = words * 4;
6920 let mut buf = vec![0u8; len];
6921 stream.read_exact(&mut buf).await?;
6922 if buf.len() == 4 {
6923 let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
6924 if code < 0 {
6925 return Err(InvocationError::Rpc(RpcError::from_telegram(
6926 code,
6927 "transport error",
6928 )));
6929 }
6930 }
6931 Ok(buf)
6932 }
6933 FrameKind::Intermediate => {
6934 let mut len_buf = [0u8; 4];
6935 stream.read_exact(&mut len_buf).await?;
6936 let len_i32 = i32::from_le_bytes(len_buf);
6937 if len_i32 < 0 {
6938 return Err(InvocationError::Rpc(RpcError::from_telegram(
6939 len_i32,
6940 "transport error",
6941 )));
6942 }
6943 if len_i32 <= 4 {
6944 let mut code_buf = [0u8; 4];
6945 stream.read_exact(&mut code_buf).await?;
6946 let code = i32::from_le_bytes(code_buf);
6947 return Err(InvocationError::Rpc(RpcError::from_telegram(
6948 code,
6949 "transport error",
6950 )));
6951 }
6952 let len = len_i32 as usize;
6953 let mut buf = vec![0u8; len];
6954 stream.read_exact(&mut buf).await?;
6955 Ok(buf)
6956 }
6957 FrameKind::Full { recv_seqno, .. } => {
6958 let mut len_buf = [0u8; 4];
6959 stream.read_exact(&mut len_buf).await?;
6960 let total_len_i32 = i32::from_le_bytes(len_buf);
6961 if total_len_i32 < 0 {
6962 return Err(InvocationError::Rpc(RpcError::from_telegram(
6963 total_len_i32,
6964 "transport error",
6965 )));
6966 }
6967 let total_len = total_len_i32 as usize;
6968 if total_len < 12 {
6969 return Err(InvocationError::Deserialize(
6970 "Full transport: packet too short".into(),
6971 ));
6972 }
6973 let mut rest = vec![0u8; total_len - 4];
6974 stream.read_exact(&mut rest).await?;
6975 let (body, crc_bytes) = rest.split_at(rest.len() - 4);
6976 let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
6977 let mut check_input = Vec::with_capacity(4 + body.len());
6978 check_input.extend_from_slice(&len_buf);
6979 check_input.extend_from_slice(body);
6980 let actual_crc = crate::transport_intermediate::crc32_ieee(&check_input);
6981 if actual_crc != expected_crc {
6982 return Err(InvocationError::Deserialize(format!(
6983 "Full transport: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
6984 )));
6985 }
6986 let recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
6987 let expected_seq = recv_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
6988 if recv_seq != expected_seq {
6989 return Err(InvocationError::Deserialize(format!(
6990 "Full transport: seqno mismatch (got {recv_seq}, expected {expected_seq})"
6991 )));
6992 }
6993 Ok(body[4..].to_vec())
6994 }
6995 FrameKind::Obfuscated { cipher } => {
6996 let mut h = [0u8; 1];
6997 stream.read_exact(&mut h).await?;
6998 cipher.lock().await.decrypt(&mut h);
6999 let words = if h[0] < 0x7f {
7000 h[0] as usize
7001 } else {
7002 let mut b = [0u8; 3];
7003 stream.read_exact(&mut b).await?;
7004 cipher.lock().await.decrypt(&mut b);
7005 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
7006 };
7007 let mut buf = vec![0u8; words * 4];
7008 stream.read_exact(&mut buf).await?;
7009 cipher.lock().await.decrypt(&mut buf);
7010 if buf.len() == 4 {
7011 let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
7012 if code < 0 {
7013 return Err(InvocationError::Rpc(RpcError::from_telegram(
7014 code,
7015 "transport error",
7016 )));
7017 }
7018 }
7019 Ok(buf)
7020 }
7021 FrameKind::PaddedIntermediate { cipher } => {
7022 let mut len_buf = [0u8; 4];
7024 stream.read_exact(&mut len_buf).await?;
7025 cipher.lock().await.decrypt(&mut len_buf);
7026 let total_len = i32::from_le_bytes(len_buf);
7027 if total_len < 0 {
7028 return Err(InvocationError::Rpc(RpcError::from_telegram(
7029 total_len,
7030 "transport error",
7031 )));
7032 }
7033 let mut buf = vec![0u8; total_len as usize];
7034 stream.read_exact(&mut buf).await?;
7035 cipher.lock().await.decrypt(&mut buf);
7036 Ok(buf)
7040 }
7041 FrameKind::FakeTls { cipher } => {
7042 let mut hdr = [0u8; 5];
7044 stream.read_exact(&mut hdr).await?;
7045 if hdr[0] != 0x17 {
7046 return Err(InvocationError::Deserialize(format!(
7047 "FakeTLS: unexpected record type 0x{:02x}",
7048 hdr[0]
7049 )));
7050 }
7051 let payload_len = u16::from_be_bytes([hdr[3], hdr[4]]) as usize;
7052 let mut buf = vec![0u8; payload_len];
7053 stream.read_exact(&mut buf).await?;
7054 cipher.lock().await.decrypt(&mut buf);
7055 Ok(buf)
7056 }
7057 }
7058}
7059
7060async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
7062 let words = data.len() / 4;
7063 let mut frame = if words < 0x7f {
7065 let mut v = Vec::with_capacity(1 + data.len());
7066 v.push(words as u8);
7067 v
7068 } else {
7069 let mut v = Vec::with_capacity(4 + data.len());
7070 v.extend_from_slice(&[
7071 0x7f,
7072 (words & 0xff) as u8,
7073 ((words >> 8) & 0xff) as u8,
7074 ((words >> 16) & 0xff) as u8,
7075 ]);
7076 v
7077 };
7078 frame.extend_from_slice(data);
7079 stream.write_all(&frame).await?;
7080 Ok(())
7081}
7082
7083async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
7084 let mut h = [0u8; 1];
7085 stream.read_exact(&mut h).await?;
7086 let words = if h[0] < 0x7f {
7087 h[0] as usize
7088 } else {
7089 let mut b = [0u8; 3];
7090 stream.read_exact(&mut b).await?;
7091 let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
7092 if w == 1 {
7094 let mut code_buf = [0u8; 4];
7095 stream.read_exact(&mut code_buf).await?;
7096 let code = i32::from_le_bytes(code_buf);
7097 return Err(InvocationError::Rpc(RpcError::from_telegram(
7098 code,
7099 "transport error",
7100 )));
7101 }
7102 w
7103 };
7104 if words == 0 || words > 0x8000 {
7107 return Err(InvocationError::Deserialize(format!(
7108 "abridged: implausible word count {words} (possible transport error or framing mismatch)"
7109 )));
7110 }
7111 let mut buf = vec![0u8; words * 4];
7112 stream.read_exact(&mut buf).await?;
7113 Ok(buf)
7114}
7115
7116async fn recv_frame_plain<T: Deserializable>(
7118 stream: &mut TcpStream,
7119 kind: &FrameKind,
7120) -> Result<T, InvocationError> {
7121 let raw = match kind {
7128 FrameKind::Abridged => recv_abridged(stream).await?,
7129 FrameKind::Intermediate => {
7130 let mut len_buf = [0u8; 4];
7131 stream.read_exact(&mut len_buf).await?;
7132 let len = u32::from_le_bytes(len_buf) as usize;
7133 if len == 0 || len > 1 << 24 {
7134 return Err(InvocationError::Deserialize(format!(
7135 "plaintext frame: implausible length {len}"
7136 )));
7137 }
7138 let mut buf = vec![0u8; len];
7139 stream.read_exact(&mut buf).await?;
7140 buf
7141 }
7142 FrameKind::Full { recv_seqno, .. } => {
7143 let mut len_buf = [0u8; 4];
7145 stream.read_exact(&mut len_buf).await?;
7146 let total_len = u32::from_le_bytes(len_buf) as usize;
7147 if total_len < 12 || total_len > (1 << 24) + 12 {
7148 return Err(InvocationError::Deserialize(format!(
7149 "Full plaintext frame: implausible total_len {total_len}"
7150 )));
7151 }
7152 let mut rest = vec![0u8; total_len - 4];
7153 stream.read_exact(&mut rest).await?;
7154
7155 let (body, crc_bytes) = rest.split_at(rest.len() - 4);
7157 let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
7158 let mut check_input = Vec::with_capacity(4 + body.len());
7159 check_input.extend_from_slice(&len_buf);
7160 check_input.extend_from_slice(body);
7161 let actual_crc = crate::transport_intermediate::crc32_ieee(&check_input);
7162 if actual_crc != expected_crc {
7163 return Err(InvocationError::Deserialize(format!(
7164 "Full plaintext: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
7165 )));
7166 }
7167
7168 let recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
7170 let expected_seq = recv_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
7171 if recv_seq != expected_seq {
7172 return Err(InvocationError::Deserialize(format!(
7173 "Full plaintext: seqno mismatch (got {recv_seq}, expected {expected_seq})"
7174 )));
7175 }
7176
7177 body[4..].to_vec()
7178 }
7179 FrameKind::Obfuscated { cipher } => {
7180 let mut h = [0u8; 1];
7182 stream.read_exact(&mut h).await?;
7183 cipher.lock().await.decrypt(&mut h);
7184 let words = if h[0] < 0x7f {
7185 h[0] as usize
7186 } else {
7187 let mut b = [0u8; 3];
7188 stream.read_exact(&mut b).await?;
7189 cipher.lock().await.decrypt(&mut b);
7190 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
7191 };
7192 let mut buf = vec![0u8; words * 4];
7193 stream.read_exact(&mut buf).await?;
7194 cipher.lock().await.decrypt(&mut buf);
7195 buf
7196 }
7197 FrameKind::PaddedIntermediate { cipher } => {
7198 let mut len_buf = [0u8; 4];
7199 stream.read_exact(&mut len_buf).await?;
7200 cipher.lock().await.decrypt(&mut len_buf);
7201 let len = u32::from_le_bytes(len_buf) as usize;
7202 if len == 0 || len > 1 << 24 {
7203 return Err(InvocationError::Deserialize(format!(
7204 "PaddedIntermediate plaintext: implausible length {len}"
7205 )));
7206 }
7207 let mut buf = vec![0u8; len];
7208 stream.read_exact(&mut buf).await?;
7209 cipher.lock().await.decrypt(&mut buf);
7210 buf
7211 }
7212 FrameKind::FakeTls { cipher } => {
7213 let mut hdr = [0u8; 5];
7214 stream.read_exact(&mut hdr).await?;
7215 if hdr[0] != 0x17 {
7216 return Err(InvocationError::Deserialize(format!(
7217 "FakeTLS plaintext: unexpected record type 0x{:02x}",
7218 hdr[0]
7219 )));
7220 }
7221 let payload_len = u16::from_be_bytes([hdr[3], hdr[4]]) as usize;
7222 let mut buf = vec![0u8; payload_len];
7223 stream.read_exact(&mut buf).await?;
7224 cipher.lock().await.decrypt(&mut buf);
7225 buf
7226 }
7227 };
7228 if raw.len() < 20 {
7229 return Err(InvocationError::Deserialize(
7230 "plaintext frame too short".into(),
7231 ));
7232 }
7233 if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
7234 return Err(InvocationError::Deserialize(
7235 "expected auth_key_id=0 in plaintext".into(),
7236 ));
7237 }
7238 let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
7239 if 20 + body_len > raw.len() {
7240 return Err(InvocationError::Deserialize(
7241 "plaintext frame: body_len exceeds frame size".into(),
7242 ));
7243 }
7244 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
7245 T::deserialize(&mut cur).map_err(Into::into)
7246}
7247
7248enum EnvelopeResult {
7251 Payload(Vec<u8>),
7252 RawUpdates(Vec<Vec<u8>>),
7254 Pts(i32, i32),
7256 None,
7257}
7258
7259fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
7260 if body.len() < 4 {
7261 return Err(InvocationError::Deserialize("body < 4 bytes".into()));
7262 }
7263 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
7264
7265 match cid {
7266 ID_RPC_RESULT => {
7267 if body.len() < 12 {
7268 return Err(InvocationError::Deserialize("rpc_result too short".into()));
7269 }
7270 unwrap_envelope(body[12..].to_vec())
7271 }
7272 ID_RPC_ERROR => {
7273 if body.len() < 8 {
7274 return Err(InvocationError::Deserialize("rpc_error too short".into()));
7275 }
7276 let code = i32::from_le_bytes(body[4..8].try_into().unwrap());
7277 let message = tl_read_string(&body[8..]).unwrap_or_default();
7278 Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
7279 }
7280 ID_MSG_CONTAINER => {
7281 if body.len() < 8 {
7282 return Err(InvocationError::Deserialize("container too short".into()));
7283 }
7284 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
7285 let mut pos = 8usize;
7286 let mut payload: Option<Vec<u8>> = None;
7287 let mut raw_updates: Vec<Vec<u8>> = Vec::new();
7288
7289 for _ in 0..count {
7290 if pos + 16 > body.len() { break; }
7291 let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
7292 pos += 16;
7293 if pos + inner_len > body.len() { break; }
7294 let inner = body[pos..pos + inner_len].to_vec();
7295 pos += inner_len;
7296 match unwrap_envelope(inner)? {
7297 EnvelopeResult::Payload(p) => { payload = Some(p); }
7298 EnvelopeResult::RawUpdates(mut raws) => { raw_updates.append(&mut raws); }
7299 EnvelopeResult::Pts(_, _) => {} EnvelopeResult::None => {}
7301 }
7302 }
7303 if let Some(p) = payload {
7304 Ok(EnvelopeResult::Payload(p))
7305 } else if !raw_updates.is_empty() {
7306 Ok(EnvelopeResult::RawUpdates(raw_updates))
7307 } else {
7308 Ok(EnvelopeResult::None)
7309 }
7310 }
7311 ID_GZIP_PACKED => {
7312 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
7313 unwrap_envelope(gz_inflate(&bytes)?)
7314 }
7315 ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY
7320 | 0xd33b5459 | 0x04deb57d | 0x8cc0d131 | 0x276d3ec6 | 0x809db6df | 0x7d861a08 | 0x0949d9dc | 0xae500895 | 0x9299359f | 0xe22045fc | 0x62d350c9 => {
7333 Ok(EnvelopeResult::None)
7334 }
7335 ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
7341 | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
7342 | ID_UPDATES_TOO_LONG => {
7343 Ok(EnvelopeResult::RawUpdates(vec![body]))
7344 }
7345 ID_UPDATE_SHORT_SENT_MSG => {
7352 let mut cur = Cursor::from_slice(&body[4..]);
7353 match tl::types::UpdateShortSentMessage::deserialize(&mut cur) {
7354 Ok(m) => {
7355 tracing::debug!(
7356 "[layer] updateShortSentMessage (RPC): pts={} pts_count={}: advancing pts",
7357 m.pts, m.pts_count
7358 );
7359 Ok(EnvelopeResult::Pts(m.pts, m.pts_count))
7360 }
7361 Err(e) => {
7362 tracing::debug!("[layer] updateShortSentMessage deserialize error: {e}");
7363 Ok(EnvelopeResult::None)
7364 }
7365 }
7366 }
7367 _ => Ok(EnvelopeResult::Payload(body)),
7368 }
7369}
7370
7371fn random_i64() -> i64 {
7374 let mut b = [0u8; 8];
7375 getrandom::getrandom(&mut b).expect("getrandom");
7376 i64::from_le_bytes(b)
7377}
7378
7379fn jitter_delay(base_ms: u64) -> Duration {
7383 let mut b = [0u8; 2];
7385 getrandom::getrandom(&mut b).unwrap_or(());
7386 let rand_frac = u16::from_le_bytes(b) as f64 / 65535.0; let factor = 0.80 + rand_frac * 0.40; Duration::from_millis((base_ms as f64 * factor) as u64)
7389}
7390
7391pub(crate) fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
7392 if data.is_empty() {
7393 return Some(vec![]);
7394 }
7395 let (len, start) = if data[0] < 254 {
7396 (data[0] as usize, 1)
7397 } else if data.len() >= 4 {
7398 (
7399 data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16,
7400 4,
7401 )
7402 } else {
7403 return None;
7404 };
7405 if data.len() < start + len {
7406 return None;
7407 }
7408 Some(data[start..start + len].to_vec())
7409}
7410
7411fn tl_read_string(data: &[u8]) -> Option<String> {
7412 tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
7413}
7414
7415pub(crate) fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
7416 use std::io::Read;
7417 let mut out = Vec::new();
7418 if flate2::read::GzDecoder::new(data)
7419 .read_to_end(&mut out)
7420 .is_ok()
7421 && !out.is_empty()
7422 {
7423 return Ok(out);
7424 }
7425 out.clear();
7426 flate2::read::ZlibDecoder::new(data)
7427 .read_to_end(&mut out)
7428 .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
7429 Ok(out)
7430}
7431
7432pub(crate) fn maybe_gz_decompress(body: Vec<u8>) -> Result<Vec<u8>, InvocationError> {
7433 const ID_GZIP_PACKED_LOCAL: u32 = 0x3072cfa1;
7434 if body.len() >= 4 && u32::from_le_bytes(body[0..4].try_into().unwrap()) == ID_GZIP_PACKED_LOCAL
7435 {
7436 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
7437 gz_inflate(&bytes)
7438 } else {
7439 Ok(body)
7440 }
7441}
7442
7443const COMPRESSION_THRESHOLD: usize = 512;
7448
7449fn tl_write_bytes(data: &[u8]) -> Vec<u8> {
7451 let len = data.len();
7452 let mut out = Vec::with_capacity(4 + len);
7453 if len < 254 {
7454 out.push(len as u8);
7455 out.extend_from_slice(data);
7456 let pad = (4 - (1 + len) % 4) % 4;
7457 out.extend(std::iter::repeat_n(0u8, pad));
7458 } else {
7459 out.push(0xfe);
7460 out.extend_from_slice(&(len as u32).to_le_bytes()[..3]);
7461 out.extend_from_slice(data);
7462 let pad = (4 - (4 + len) % 4) % 4;
7463 out.extend(std::iter::repeat_n(0u8, pad));
7464 }
7465 out
7466}
7467
7468fn gz_pack_body(data: &[u8]) -> Vec<u8> {
7470 use std::io::Write;
7471 let mut enc = flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default());
7472 let _ = enc.write_all(data);
7473 let compressed = enc.finish().unwrap_or_default();
7474 let mut out = Vec::with_capacity(4 + 4 + compressed.len());
7475 out.extend_from_slice(&ID_GZIP_PACKED.to_le_bytes());
7476 out.extend(tl_write_bytes(&compressed));
7477 out
7478}
7479
7480fn maybe_gz_pack(data: &[u8]) -> Vec<u8> {
7483 if data.len() <= COMPRESSION_THRESHOLD {
7484 return data.to_vec();
7485 }
7486 let packed = gz_pack_body(data);
7487 if packed.len() < data.len() {
7488 packed
7489 } else {
7490 data.to_vec()
7491 }
7492}
7493
7494fn build_msgs_ack_body(msg_ids: &[i64]) -> Vec<u8> {
7498 let mut out = Vec::with_capacity(4 + 4 + 4 + msg_ids.len() * 8);
7501 out.extend_from_slice(&ID_MSGS_ACK.to_le_bytes());
7502 out.extend_from_slice(&0x1cb5c415_u32.to_le_bytes()); out.extend_from_slice(&(msg_ids.len() as u32).to_le_bytes());
7504 for &id in msg_ids {
7505 out.extend_from_slice(&id.to_le_bytes());
7506 }
7507 out
7508}
7509
7510fn build_container_body(messages: &[(i64, i32, &[u8])]) -> Vec<u8> {
7518 let total_body: usize = messages.iter().map(|(_, _, b)| 16 + b.len()).sum();
7519 let mut out = Vec::with_capacity(8 + total_body);
7520 out.extend_from_slice(&ID_MSG_CONTAINER.to_le_bytes());
7521 out.extend_from_slice(&(messages.len() as u32).to_le_bytes());
7522 for &(msg_id, seqno, body) in messages {
7523 out.extend_from_slice(&msg_id.to_le_bytes());
7524 out.extend_from_slice(&seqno.to_le_bytes());
7525 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
7526 out.extend_from_slice(body);
7527 }
7528 out
7529}