1#![doc(html_root_url = "https://docs.rs/layer-client/0.4.0")]
2#![deny(unsafe_code)]
22
23mod errors;
24mod retry;
25mod session;
26mod transport;
27mod two_factor_auth;
28pub mod update;
29pub mod parsers;
30pub mod media;
31pub mod participants;
32pub mod pts;
33
34pub mod dc_pool;
36pub mod transport_obfuscated;
37pub mod transport_intermediate;
38pub mod socks5;
39pub mod session_backend;
40pub mod inline_iter;
41pub mod typing_guard;
42pub mod keyboard;
43pub mod search;
44pub mod types;
45
46#[macro_use]
47pub mod macros;
48
49#[cfg(test)]
50mod pts_tests;
51
52pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
53pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
54pub use update::Update;
55pub use media::{UploadedFile, DownloadIter, Photo, Document, Sticker, Downloadable};
56pub use participants::Participant;
57pub use typing_guard::TypingGuard;
58pub use socks5::Socks5Config;
59pub use session_backend::{SessionBackend, BinaryFileBackend, InMemoryBackend, StringSessionBackend};
60pub use keyboard::{Button, InlineKeyboard, ReplyKeyboard};
61pub use search::{SearchBuilder, GlobalSearchBuilder};
62pub use types::{User, Group, Channel, Chat};
63
64pub use layer_tl_types as tl;
67
68use std::collections::HashMap;
69use std::collections::VecDeque;
70use std::num::NonZeroU32;
71use std::ops::ControlFlow;
72use std::sync::Arc;
73use std::time::Duration;
74
75use layer_mtproto::{EncryptedSession, Session, authentication as auth};
76use layer_tl_types::{Cursor, Deserializable, RemoteCall};
77use session::{DcEntry, PersistedSession};
78use tokio::io::{AsyncReadExt, AsyncWriteExt};
79use tokio::net::TcpStream;
80use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
81use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
82use tokio::time::sleep;
83use tokio_util::sync::CancellationToken;
84use socket2::TcpKeepalive;
85
86const ID_RPC_RESULT: u32 = 0xf35c6d01;
89const ID_RPC_ERROR: u32 = 0x2144ca19;
90const ID_MSG_CONTAINER: u32 = 0x73f1f8dc;
91const ID_GZIP_PACKED: u32 = 0x3072cfa1;
92const ID_PONG: u32 = 0x347773c5;
93const ID_MSGS_ACK: u32 = 0x62d6b459;
94const ID_BAD_SERVER_SALT: u32 = 0xedab447b;
95const ID_NEW_SESSION: u32 = 0x9ec20908;
96const ID_BAD_MSG_NOTIFY: u32 = 0xa7eff811;
97const ID_FUTURE_SALTS: u32 = 0xae500895;
99const ID_MSG_DETAILED_INFO: u32 = 0x276d3ec6;
101const ID_MSG_NEW_DETAIL_INFO: u32 = 0x809db6df;
102const ID_MSG_RESEND_REQ: u32 = 0x7d861a08;
104const ID_UPDATES: u32 = 0x74ae4240;
105const ID_UPDATE_SHORT: u32 = 0x78d4dec1;
106const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
107const ID_UPDATE_SHORT_MSG: u32 = 0x313bc7f8;
108const ID_UPDATE_SHORT_CHAT_MSG: u32 = 0x4d6deea5;
109const ID_UPDATES_TOO_LONG: u32 = 0xe317af7e;
110
111const PING_DELAY_SECS: u64 = 60;
117
118const NO_PING_DISCONNECT: i32 = 75;
122
123const RECONNECT_BASE_MS: u64 = 500;
125
126const RECONNECT_MAX_SECS: u64 = 5;
131
132const TCP_KEEPALIVE_IDLE_SECS: u64 = 10;
134const TCP_KEEPALIVE_INTERVAL_SECS: u64 = 5;
136const TCP_KEEPALIVE_PROBES: u32 = 3;
138
139#[derive(Default)]
147pub struct PeerCache {
148 pub users: HashMap<i64, i64>,
150 pub channels: HashMap<i64, i64>,
152}
153
154impl PeerCache {
155 fn cache_user(&mut self, user: &tl::enums::User) {
156 if let tl::enums::User::User(u) = user
157 && let Some(hash) = u.access_hash {
158 self.users.insert(u.id, hash);
159 }
160 }
161
162 fn cache_chat(&mut self, chat: &tl::enums::Chat) {
163 match chat {
164 tl::enums::Chat::Channel(c) => {
165 if let Some(hash) = c.access_hash {
166 self.channels.insert(c.id, hash);
167 }
168 }
169 tl::enums::Chat::ChannelForbidden(c) => {
170 self.channels.insert(c.id, c.access_hash);
171 }
172 _ => {}
173 }
174 }
175
176 fn cache_users(&mut self, users: &[tl::enums::User]) {
177 for u in users { self.cache_user(u); }
178 }
179
180 fn cache_chats(&mut self, chats: &[tl::enums::Chat]) {
181 for c in chats { self.cache_chat(c); }
182 }
183
184 fn user_input_peer(&self, user_id: i64) -> tl::enums::InputPeer {
185 if user_id == 0 {
186 return tl::enums::InputPeer::PeerSelf;
187 }
188 let hash = self.users.get(&user_id).copied().unwrap_or(0);
189 tl::enums::InputPeer::User(tl::types::InputPeerUser { user_id, access_hash: hash })
190 }
191
192 fn channel_input_peer(&self, channel_id: i64) -> tl::enums::InputPeer {
193 let hash = self.channels.get(&channel_id).copied().unwrap_or(0);
194 tl::enums::InputPeer::Channel(tl::types::InputPeerChannel { channel_id, access_hash: hash })
195 }
196
197 fn peer_to_input(&self, peer: &tl::enums::Peer) -> tl::enums::InputPeer {
198 match peer {
199 tl::enums::Peer::User(u) => self.user_input_peer(u.user_id),
200 tl::enums::Peer::Chat(c) => tl::enums::InputPeer::Chat(
201 tl::types::InputPeerChat { chat_id: c.chat_id }
202 ),
203 tl::enums::Peer::Channel(c) => self.channel_input_peer(c.channel_id),
204 }
205 }
206}
207
208#[derive(Clone, Default)]
220pub struct InputMessage {
221 pub text: String,
222 pub reply_to: Option<i32>,
223 pub silent: bool,
224 pub background: bool,
225 pub clear_draft: bool,
226 pub no_webpage: bool,
227 pub invert_media: bool,
229 pub schedule_once_online: bool,
231 pub entities: Option<Vec<tl::enums::MessageEntity>>,
232 pub reply_markup: Option<tl::enums::ReplyMarkup>,
233 pub schedule_date: Option<i32>,
234 pub media: Option<tl::enums::InputMedia>,
237}
238
239impl InputMessage {
240 pub fn text(text: impl Into<String>) -> Self {
242 Self { text: text.into(), ..Default::default() }
243 }
244
245 pub fn set_text(mut self, text: impl Into<String>) -> Self {
247 self.text = text.into(); self
248 }
249
250 pub fn reply_to(mut self, id: Option<i32>) -> Self {
252 self.reply_to = id; self
253 }
254
255 pub fn silent(mut self, v: bool) -> Self {
257 self.silent = v; self
258 }
259
260 pub fn background(mut self, v: bool) -> Self {
262 self.background = v; self
263 }
264
265 pub fn clear_draft(mut self, v: bool) -> Self {
267 self.clear_draft = v; self
268 }
269
270 pub fn no_webpage(mut self, v: bool) -> Self {
272 self.no_webpage = v; self
273 }
274
275 pub fn invert_media(mut self, v: bool) -> Self {
277 self.invert_media = v; self
278 }
279
280 pub fn schedule_once_online(mut self) -> Self {
285 self.schedule_once_online = true;
286 self.schedule_date = None;
287 self
288 }
289
290 pub fn entities(mut self, e: Vec<tl::enums::MessageEntity>) -> Self {
292 self.entities = Some(e); self
293 }
294
295 pub fn reply_markup(mut self, rm: tl::enums::ReplyMarkup) -> Self {
297 self.reply_markup = Some(rm); self
298 }
299
300 pub fn keyboard(mut self, kb: impl Into<tl::enums::ReplyMarkup>) -> Self {
310 self.reply_markup = Some(kb.into()); self
311 }
312
313 pub fn schedule_date(mut self, ts: Option<i32>) -> Self {
315 self.schedule_date = ts; self
316 }
317
318 pub fn copy_media(mut self, media: tl::enums::InputMedia) -> Self {
334 self.media = Some(media); self
335 }
336
337 pub fn clear_media(mut self) -> Self {
339 self.media = None; self
340 }
341
342 fn reply_header(&self) -> Option<tl::enums::InputReplyTo> {
343 self.reply_to.map(|id| {
344 tl::enums::InputReplyTo::Message(
345 tl::types::InputReplyToMessage {
346 reply_to_msg_id: id,
347 top_msg_id: None,
348 reply_to_peer_id: None,
349 quote_text: None,
350 quote_entities: None,
351 quote_offset: None,
352 monoforum_peer_id: None,
353 todo_item_id: None,
354 poll_option: None,
355 }
356 )
357 })
358 }
359}
360
361impl From<&str> for InputMessage {
362 fn from(s: &str) -> Self { Self::text(s) }
363}
364
365impl From<String> for InputMessage {
366 fn from(s: String) -> Self { Self::text(s) }
367}
368
369#[derive(Clone, Debug, Default)]
380pub enum TransportKind {
381 #[default]
385 Abridged,
386 Intermediate,
390 Full,
394 Obfuscated { secret: Option<[u8; 16]> },
401}
402
403pub type ShutdownToken = CancellationToken;
423
424#[derive(Clone)]
426pub struct Config {
427 pub api_id: i32,
428 pub api_hash: String,
429 pub dc_addr: Option<String>,
430 pub retry_policy: Arc<dyn RetryPolicy>,
431 pub socks5: Option<crate::socks5::Socks5Config>,
433 pub allow_ipv6: bool,
435 pub transport: TransportKind,
437 pub session_backend: Arc<dyn crate::session_backend::SessionBackend>,
439 pub catch_up: bool,
443}
444
445impl Config {
446 pub fn with_string_session(s: impl Into<String>) -> Self {
461 Config {
462 session_backend: Arc::new(
463 crate::session_backend::StringSessionBackend::new(s)
464 ),
465 ..Config::default()
466 }
467 }
468}
469
470impl Default for Config {
471 fn default() -> Self {
472 Self {
473 api_id: 0,
474 api_hash: String::new(),
475 dc_addr: None,
476 retry_policy: Arc::new(AutoSleep::default()),
477 socks5: None,
478 allow_ipv6: false,
479 transport: TransportKind::Abridged,
480 session_backend: Arc::new(crate::session_backend::BinaryFileBackend::new("layer.session")),
481 catch_up: false,
482 }
483 }
484}
485
486pub struct UpdateStream {
491 rx: mpsc::UnboundedReceiver<update::Update>,
492}
493
494impl UpdateStream {
495 pub async fn next(&mut self) -> Option<update::Update> {
497 self.rx.recv().await
498 }
499
500 pub async fn next_raw(&mut self) -> Option<update::RawUpdate> {
506 loop {
507 match self.rx.recv().await? {
508 update::Update::Raw(r) => return Some(r),
509 _ => continue,
510 }
511 }
512 }
513}
514
515#[derive(Debug, Clone)]
519pub struct Dialog {
520 pub raw: tl::enums::Dialog,
521 pub message: Option<tl::enums::Message>,
522 pub entity: Option<tl::enums::User>,
523 pub chat: Option<tl::enums::Chat>,
524}
525
526impl Dialog {
527 pub fn title(&self) -> String {
529 if let Some(tl::enums::User::User(u)) = &self.entity {
530 let first = u.first_name.as_deref().unwrap_or("");
531 let last = u.last_name.as_deref().unwrap_or("");
532 let name = format!("{first} {last}").trim().to_string();
533 if !name.is_empty() { return name; }
534 }
535 if let Some(chat) = &self.chat {
536 return match chat {
537 tl::enums::Chat::Chat(c) => c.title.clone(),
538 tl::enums::Chat::Forbidden(c) => c.title.clone(),
539 tl::enums::Chat::Channel(c) => c.title.clone(),
540 tl::enums::Chat::ChannelForbidden(c) => c.title.clone(),
541 tl::enums::Chat::Empty(_) => "(empty)".into(),
542 };
543 }
544 "(Unknown)".to_string()
545 }
546
547 pub fn peer(&self) -> Option<&tl::enums::Peer> {
549 match &self.raw {
550 tl::enums::Dialog::Dialog(d) => Some(&d.peer),
551 tl::enums::Dialog::Folder(_) => None,
552 }
553 }
554
555 pub fn unread_count(&self) -> i32 {
557 match &self.raw {
558 tl::enums::Dialog::Dialog(d) => d.unread_count,
559 _ => 0,
560 }
561 }
562
563 pub fn top_message(&self) -> i32 {
565 match &self.raw {
566 tl::enums::Dialog::Dialog(d) => d.top_message,
567 _ => 0,
568 }
569 }
570}
571
572struct ClientInner {
575 writer: Mutex<ConnectionWriter>,
579 #[allow(clippy::type_complexity)]
583 pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>>,
584 reconnect_tx: mpsc::UnboundedSender<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
587 network_hint_tx: mpsc::UnboundedSender<()>,
590 #[allow(dead_code)]
592 shutdown_token: CancellationToken,
593 #[allow(dead_code)]
595 catch_up: bool,
596 home_dc_id: Mutex<i32>,
597 dc_options: Mutex<HashMap<i32, DcEntry>>,
598 pub peer_cache: RwLock<PeerCache>,
599 pub pts_state: Mutex<pts::PtsState>,
600 pub possible_gap: Mutex<pts::PossibleGapBuffer>,
602 api_id: i32,
603 api_hash: String,
604 retry_policy: Arc<dyn RetryPolicy>,
605 socks5: Option<crate::socks5::Socks5Config>,
606 allow_ipv6: bool,
607 transport: TransportKind,
608 session_backend: Arc<dyn crate::session_backend::SessionBackend>,
609 dc_pool: Mutex<dc_pool::DcPool>,
610 update_tx: mpsc::UnboundedSender<update::Update>,
611}
612
613#[derive(Clone)]
615pub struct Client {
616 pub(crate) inner: Arc<ClientInner>,
617 _update_rx: Arc<Mutex<mpsc::UnboundedReceiver<update::Update>>>,
618}
619
620impl Client {
621 pub async fn connect(config: Config) -> Result<(Self, ShutdownToken), InvocationError> {
624 let (update_tx, update_rx) = mpsc::unbounded_channel();
625
626 let socks5 = config.socks5.clone();
628 let transport = config.transport.clone();
629
630 let (conn, home_dc_id, dc_opts, loaded_session) =
631 match config.session_backend.load()
632 .map_err(InvocationError::Io)?
633 {
634 Some(s) => {
635 if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
636 if let Some(key) = dc.auth_key {
637 log::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
638 match Connection::connect_with_key(
639 &dc.addr, key, dc.first_salt, dc.time_offset,
640 socks5.as_ref(), &transport,
641 ).await {
642 Ok(c) => {
643 let mut opts = session::default_dc_addresses()
644 .into_iter()
645 .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
646 .collect::<HashMap<_, _>>();
647 for d in &s.dcs { opts.insert(d.dc_id, d.clone()); }
648 (c, s.home_dc_id, opts, Some(s))
649 }
650 Err(e) => {
651 log::warn!("[layer] Session connect failed ({e}), fresh connect …");
652 let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
653 (c, dc, opts, None)
654 }
655 }
656 } else {
657 let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
658 (c, dc, opts, None)
659 }
660 } else {
661 let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
662 (c, dc, opts, None)
663 }
664 }
665 None => {
666 let (c, dc, opts) = Self::fresh_connect(socks5.as_ref(), &transport).await?;
667 (c, dc, opts, None)
668 }
669 };
670
671 let pool = dc_pool::DcPool::new(home_dc_id, &dc_opts.values().cloned().collect::<Vec<_>>());
673
674 let (writer, read_half, frame_kind) = conn.into_writer();
679 let auth_key = writer.enc.auth_key_bytes();
680 let session_id = writer.enc.session_id();
681
682 #[allow(clippy::type_complexity)]
683 let pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>> =
684 Arc::new(Mutex::new(HashMap::new()));
685
686 let (reconnect_tx, reconnect_rx) =
688 mpsc::unbounded_channel::<(OwnedReadHalf, FrameKind, [u8; 256], i64)>();
689
690 let (network_hint_tx, network_hint_rx) = mpsc::unbounded_channel::<()>();
693
694
695 let shutdown_token = CancellationToken::new();
697 let catch_up = config.catch_up;
698
699 let inner = Arc::new(ClientInner {
700 writer: Mutex::new(writer),
701 pending: pending.clone(),
702 reconnect_tx,
703 network_hint_tx,
704 shutdown_token: shutdown_token.clone(),
705 catch_up,
706 home_dc_id: Mutex::new(home_dc_id),
707 dc_options: Mutex::new(dc_opts),
708 peer_cache: RwLock::new(PeerCache::default()),
709 pts_state: Mutex::new(pts::PtsState::default()),
710 possible_gap: Mutex::new(pts::PossibleGapBuffer::new()),
711 api_id: config.api_id,
712 api_hash: config.api_hash,
713 retry_policy: config.retry_policy,
714 socks5: config.socks5,
715 allow_ipv6: config.allow_ipv6,
716 transport: config.transport,
717 session_backend: config.session_backend,
718 dc_pool: Mutex::new(pool),
719 update_tx,
720 });
721
722 let client = Self {
723 inner,
724 _update_rx: Arc::new(Mutex::new(update_rx)),
725 };
726
727 {
730 let client_r = client.clone();
731 let shutdown_r = shutdown_token.clone();
732 tokio::spawn(async move {
733 client_r.run_reader_task(
734 read_half, frame_kind, auth_key, session_id,
735 reconnect_rx, network_hint_rx, shutdown_r,
736 ).await;
737 });
738 }
739
740 if let Err(e) = client.init_connection().await {
743 log::warn!("[layer] init_connection failed ({e}), retrying with fresh connect …");
744
745 let socks5_r = client.inner.socks5.clone();
746 let transport_r = client.inner.transport.clone();
747 let (new_conn, new_dc_id, new_opts) =
748 Self::fresh_connect(socks5_r.as_ref(), &transport_r).await?;
749
750 {
751 let mut dc_guard = client.inner.home_dc_id.lock().await;
752 *dc_guard = new_dc_id;
753 }
754 {
755 let mut opts_guard = client.inner.dc_options.lock().await;
756 *opts_guard = new_opts;
757 }
758
759 let (new_writer, new_read, new_fk) = new_conn.into_writer();
761 let new_ak = new_writer.enc.auth_key_bytes();
762 let new_sid = new_writer.enc.session_id();
763 *client.inner.writer.lock().await = new_writer;
764 let _ = client.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
765
766 client.init_connection().await?;
767 }
768
769 if let Some(ref s) = loaded_session
771 && !s.peers.is_empty() {
772 let mut cache = client.inner.peer_cache.write().await;
773 for p in &s.peers {
774 if p.is_channel {
775 cache.channels.entry(p.id).or_insert(p.access_hash);
776 } else {
777 cache.users.entry(p.id).or_insert(p.access_hash);
778 }
779 }
780 log::debug!("[layer] Peer cache restored: {} users, {} channels",
781 cache.users.len(), cache.channels.len());
782 }
783
784 let has_saved_state = loaded_session
794 .as_ref()
795 .is_some_and(|s| s.updates_state.is_initialised());
796
797 if catch_up && has_saved_state {
798 let snap = &loaded_session.as_ref().unwrap().updates_state;
799 let mut state = client.inner.pts_state.lock().await;
800 state.pts = snap.pts;
801 state.qts = snap.qts;
802 state.date = snap.date;
803 state.seq = snap.seq;
804 for &(cid, cpts) in &snap.channels {
805 state.channel_pts.insert(cid, cpts);
806 }
807 log::info!("[layer] Update state restored: pts={}, qts={}, seq={}, {} channels",
808 state.pts, state.qts, state.seq, state.channel_pts.len());
809 drop(state);
810
811 let channel_ids: Vec<i64> = snap.channels.iter().map(|&(cid, _)| cid).collect();
815
816 let c = client.clone();
819 let utx = client.inner.update_tx.clone();
820 tokio::spawn(async move {
821 match c.get_difference().await {
823 Ok(missed) => {
824 log::info!("[layer] catch_up: {} global updates replayed", missed.len());
825 for u in missed { let _ = utx.send(u); }
826 }
827 Err(e) => log::warn!("[layer] catch_up getDifference: {e}"),
828 }
829
830 if !channel_ids.is_empty() {
834 log::info!("[layer] catch_up: per-channel diff for {} channels", channel_ids.len());
835 for channel_id in channel_ids {
836 let c2 = c.clone();
837 let utx2 = utx.clone();
838 tokio::spawn(async move {
839 match c2.get_channel_difference(channel_id).await {
840 Ok(updates) => {
841 if !updates.is_empty() {
842 log::info!("[layer] catch_up channel {channel_id}: {} updates", updates.len());
843 }
844 for u in updates { let _ = utx2.send(u); }
845 }
846 Err(e) => log::warn!("[layer] catch_up channel {channel_id}: {e}"),
847 }
848 });
849 }
850 }
851 });
852 } else {
853 let _ = client.sync_pts_state().await;
855 }
856
857 Ok((client, shutdown_token))
858 }
859
860 async fn fresh_connect(
861 socks5: Option<&crate::socks5::Socks5Config>,
862 transport: &TransportKind,
863 ) -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
864 log::info!("[layer] Fresh connect to DC2 …");
865 let conn = Connection::connect_raw("149.154.167.51:443", socks5, transport).await?;
866 let opts = session::default_dc_addresses()
867 .into_iter()
868 .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
869 .collect();
870 Ok((conn, 2, opts))
871 }
872
873 pub async fn save_session(&self) -> Result<(), InvocationError> {
876 use session::{CachedPeer, UpdatesStateSnap};
877
878 let writer_guard = self.inner.writer.lock().await;
879 let home_dc_id = *self.inner.home_dc_id.lock().await;
880 let dc_options = self.inner.dc_options.lock().await;
881
882 let mut dcs: Vec<DcEntry> = dc_options.values().map(|e| DcEntry {
883 dc_id: e.dc_id,
884 addr: e.addr.clone(),
885 auth_key: if e.dc_id == home_dc_id { Some(writer_guard.auth_key_bytes()) } else { e.auth_key },
886 first_salt: if e.dc_id == home_dc_id { writer_guard.first_salt() } else { e.first_salt },
887 time_offset: if e.dc_id == home_dc_id { writer_guard.time_offset() } else { e.time_offset },
888 }).collect();
889 self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
890
891 let pts_snap = {
893 let s = self.inner.pts_state.lock().await;
894 UpdatesStateSnap {
895 pts: s.pts,
896 qts: s.qts,
897 date: s.date,
898 seq: s.seq,
899 channels: s.channel_pts.iter().map(|(&k, &v)| (k, v)).collect(),
900 }
901 };
902
903 let peers: Vec<CachedPeer> = {
905 let cache = self.inner.peer_cache.read().await;
906 let mut v = Vec::with_capacity(cache.users.len() + cache.channels.len());
907 for (&id, &hash) in &cache.users { v.push(CachedPeer { id, access_hash: hash, is_channel: false }); }
908 for (&id, &hash) in &cache.channels { v.push(CachedPeer { id, access_hash: hash, is_channel: true }); }
909 v
910 };
911
912 self.inner.session_backend
913 .save(&PersistedSession { home_dc_id, dcs, updates_state: pts_snap, peers })
914 .map_err(InvocationError::Io)?;
915 log::info!("[layer] Session saved ✓");
916 Ok(())
917 }
918
919 pub async fn export_session_string(&self) -> Result<String, InvocationError> {
929 use session::{CachedPeer, UpdatesStateSnap};
930
931 let writer_guard = self.inner.writer.lock().await;
932 let home_dc_id = *self.inner.home_dc_id.lock().await;
933 let dc_options = self.inner.dc_options.lock().await;
934
935 let mut dcs: Vec<DcEntry> = dc_options.values().map(|e| DcEntry {
936 dc_id: e.dc_id,
937 addr: e.addr.clone(),
938 auth_key: if e.dc_id == home_dc_id { Some(writer_guard.auth_key_bytes()) } else { e.auth_key },
939 first_salt: if e.dc_id == home_dc_id { writer_guard.first_salt() } else { e.first_salt },
940 time_offset: if e.dc_id == home_dc_id { writer_guard.time_offset() } else { e.time_offset },
941 }).collect();
942 self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
943
944 let pts_snap = {
945 let s = self.inner.pts_state.lock().await;
946 UpdatesStateSnap {
947 pts: s.pts,
948 qts: s.qts,
949 date: s.date,
950 seq: s.seq,
951 channels: s.channel_pts.iter().map(|(&k, &v)| (k, v)).collect(),
952 }
953 };
954
955 let peers: Vec<CachedPeer> = {
956 let cache = self.inner.peer_cache.read().await;
957 let mut v = Vec::with_capacity(cache.users.len() + cache.channels.len());
958 for (&id, &hash) in &cache.users { v.push(CachedPeer { id, access_hash: hash, is_channel: false }); }
959 for (&id, &hash) in &cache.channels { v.push(CachedPeer { id, access_hash: hash, is_channel: true }); }
960 v
961 };
962
963 let session = PersistedSession { home_dc_id, dcs, updates_state: pts_snap, peers };
964 Ok(session.to_string())
965 }
966
967
968
969 pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
971 match self.invoke(&tl::functions::updates::GetState {}).await {
972 Ok(_) => Ok(true),
973 Err(e) if e.is("AUTH_KEY_UNREGISTERED")
974 || matches!(&e, InvocationError::Rpc(r) if r.code == 401) => Ok(false),
975 Err(e) => Err(e),
976 }
977 }
978
979 pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
981 let req = tl::functions::auth::ImportBotAuthorization {
982 flags: 0, api_id: self.inner.api_id,
983 api_hash: self.inner.api_hash.clone(),
984 bot_auth_token: token.to_string(),
985 };
986
987 let result = match self.invoke(&req).await {
988 Ok(x) => x,
989 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
990 let dc_id = r.value.unwrap_or(2) as i32;
991 self.migrate_to(dc_id).await?;
992 self.invoke(&req).await?
993 }
994 Err(e) => return Err(e),
995 };
996
997 let name = match result {
998 tl::enums::auth::Authorization::Authorization(a) => {
999 self.cache_user(&a.user).await;
1000 Self::extract_user_name(&a.user)
1001 }
1002 tl::enums::auth::Authorization::SignUpRequired(_) => {
1003 panic!("unexpected SignUpRequired during bot sign-in")
1004 }
1005 };
1006 log::info!("[layer] Bot signed in ✓ ({name})");
1007 Ok(name)
1008 }
1009
1010 pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
1012 use tl::enums::auth::SentCode;
1013
1014 let req = self.make_send_code_req(phone);
1015 let body = match self.rpc_call_raw(&req).await {
1016 Ok(b) => b,
1017 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
1018 let dc_id = r.value.unwrap_or(2) as i32;
1019 self.migrate_to(dc_id).await?;
1020 self.rpc_call_raw(&req).await?
1021 }
1022 Err(e) => return Err(e),
1023 };
1024
1025 let mut cur = Cursor::from_slice(&body);
1026 let hash = match tl::enums::auth::SentCode::deserialize(&mut cur)? {
1027 SentCode::SentCode(s) => s.phone_code_hash,
1028 SentCode::Success(_) => return Err(InvocationError::Deserialize("unexpected Success".into())),
1029 SentCode::PaymentRequired(_) => return Err(InvocationError::Deserialize("payment required to send code".into())),
1030 };
1031 log::info!("[layer] Login code sent");
1032 Ok(LoginToken { phone: phone.to_string(), phone_code_hash: hash })
1033 }
1034
1035 pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
1037 let req = tl::functions::auth::SignIn {
1038 phone_number: token.phone.clone(),
1039 phone_code_hash: token.phone_code_hash.clone(),
1040 phone_code: Some(code.trim().to_string()),
1041 email_verification: None,
1042 };
1043
1044 let body = match self.rpc_call_raw(&req).await {
1045 Ok(b) => b,
1046 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
1047 let dc_id = r.value.unwrap_or(2) as i32;
1048 self.migrate_to(dc_id).await.map_err(SignInError::Other)?;
1049 self.rpc_call_raw(&req).await.map_err(SignInError::Other)?
1050 }
1051 Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
1052 let t = self.get_password_info().await.map_err(SignInError::Other)?;
1053 return Err(SignInError::PasswordRequired(Box::new(t)));
1054 }
1055 Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
1056 Err(e) => return Err(SignInError::Other(e)),
1057 };
1058
1059 let mut cur = Cursor::from_slice(&body);
1060 match tl::enums::auth::Authorization::deserialize(&mut cur)
1061 .map_err(|e| SignInError::Other(e.into()))?
1062 {
1063 tl::enums::auth::Authorization::Authorization(a) => {
1064 self.cache_user(&a.user).await;
1065 let name = Self::extract_user_name(&a.user);
1066 log::info!("[layer] Signed in ✓ Welcome, {name}!");
1067 Ok(name)
1068 }
1069 tl::enums::auth::Authorization::SignUpRequired(_) => Err(SignInError::SignUpRequired),
1070 }
1071 }
1072
1073 pub async fn check_password(
1075 &self,
1076 token: PasswordToken,
1077 password: impl AsRef<[u8]>,
1078 ) -> Result<String, InvocationError> {
1079 let pw = token.password;
1080 let algo = pw.current_algo.ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
1081 let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
1082 let g_b = pw.srp_b.ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
1083 let a = pw.secure_random;
1084 let srp_id = pw.srp_id.ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
1085
1086 let (m1, g_a) = two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
1087 let req = tl::functions::auth::CheckPassword {
1088 password: tl::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
1089 tl::types::InputCheckPasswordSrp {
1090 srp_id, a: g_a.to_vec(), m1: m1.to_vec(),
1091 },
1092 ),
1093 };
1094
1095 let body = self.rpc_call_raw(&req).await?;
1096 let mut cur = Cursor::from_slice(&body);
1097 match tl::enums::auth::Authorization::deserialize(&mut cur)? {
1098 tl::enums::auth::Authorization::Authorization(a) => {
1099 self.cache_user(&a.user).await;
1100 let name = Self::extract_user_name(&a.user);
1101 log::info!("[layer] 2FA ✓ Welcome, {name}!");
1102 Ok(name)
1103 }
1104 tl::enums::auth::Authorization::SignUpRequired(_) =>
1105 Err(InvocationError::Deserialize("unexpected SignUpRequired after 2FA".into())),
1106 }
1107 }
1108
1109 pub async fn sign_out(&self) -> Result<bool, InvocationError> {
1111 let req = tl::functions::auth::LogOut {};
1112 match self.rpc_call_raw(&req).await {
1113 Ok(_) => { log::info!("[layer] Signed out ✓"); Ok(true) }
1114 Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
1115 Err(e) => Err(e),
1116 }
1117 }
1118
1119 pub async fn get_me(&self) -> Result<tl::types::User, InvocationError> {
1123 let req = tl::functions::users::GetUsers {
1124 id: vec![tl::enums::InputUser::UserSelf],
1125 };
1126 let body = self.rpc_call_raw(&req).await?;
1127 let mut cur = Cursor::from_slice(&body);
1128 let users = Vec::<tl::enums::User>::deserialize(&mut cur)?;
1129 self.cache_users_slice(&users).await;
1130 users.into_iter().find_map(|u| match u {
1131 tl::enums::User::User(u) => Some(u),
1132 _ => None,
1133 }).ok_or_else(|| InvocationError::Deserialize("getUsers returned no user".into()))
1134 }
1135
1136 pub fn stream_updates(&self) -> UpdateStream {
1144 let (caller_tx, rx) = mpsc::unbounded_channel::<update::Update>();
1145 let internal_rx = self._update_rx.clone();
1148 tokio::spawn(async move {
1149 let mut guard = internal_rx.lock().await;
1151 while let Some(upd) = guard.recv().await {
1152 if caller_tx.send(upd).is_err() { break; }
1153 }
1154 });
1155 UpdateStream { rx }
1156 }
1157
1158 pub fn signal_network_restored(&self) {
1171 let _ = self.inner.network_hint_tx.send(());
1172 }
1173
1174 #[allow(clippy::too_many_arguments)]
1208 async fn run_reader_task(
1209 &self,
1210 read_half: OwnedReadHalf,
1211 frame_kind: FrameKind,
1212 auth_key: [u8; 256],
1213 session_id: i64,
1214 mut new_conn_rx: mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1215 mut network_hint_rx: mpsc::UnboundedReceiver<()>,
1216 shutdown_token: CancellationToken,
1217 ) {
1218 let mut rh = read_half;
1219 let mut fk = frame_kind;
1220 let mut ak = auth_key;
1221 let mut sid = session_id;
1222 let mut restart_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = None;
1225 let mut restart_count: u32 = 0;
1226
1227 loop {
1228 tokio::select! {
1229 _ = shutdown_token.cancelled() => {
1231 log::info!("[layer] Reader task: shutdown requested, exiting cleanly.");
1232 let mut pending = self.inner.pending.lock().await;
1233 for (_, tx) in pending.drain() {
1234 let _ = tx.send(Err(InvocationError::Dropped));
1235 }
1236 return;
1237 }
1238
1239 _ = self.reader_loop(
1241 rh, fk, ak, sid,
1242 restart_init_rx.take(),
1243 &mut new_conn_rx, &mut network_hint_rx,
1244 ) => {}
1245 }
1246
1247 if shutdown_token.is_cancelled() {
1250 log::info!("[layer] Reader task: exiting after loop (shutdown).");
1251 return;
1252 }
1253
1254 restart_count += 1;
1255 log::error!(
1256 "[layer] Reader loop exited unexpectedly (restart #{restart_count}) — supervisor reconnecting …"
1257 );
1258
1259 {
1261 let mut pending = self.inner.pending.lock().await;
1262 for (_, tx) in pending.drain() {
1263 let _ = tx.send(Err(InvocationError::Io(std::io::Error::new(
1264 std::io::ErrorKind::ConnectionReset,
1265 "reader task restarted",
1266 ))));
1267 }
1268 }
1269 self.inner.writer.lock().await.sent_bodies.clear();
1271
1272 let mut delay_ms = RECONNECT_BASE_MS;
1274 let new_conn = loop {
1275 log::info!("[layer] Supervisor: reconnecting in {delay_ms} ms …");
1276 tokio::select! {
1277 _ = shutdown_token.cancelled() => {
1278 log::info!("[layer] Supervisor: shutdown during reconnect, exiting.");
1279 return;
1280 }
1281 _ = sleep(Duration::from_millis(delay_ms)) => {}
1282 }
1283
1284 let dummy_ak = [0u8; 256];
1289 let dummy_fk = FrameKind::Abridged;
1290 match self.do_reconnect(&dummy_ak, &dummy_fk).await {
1291 Ok(conn) => break conn,
1292 Err(e) => {
1293 log::warn!("[layer] Supervisor: reconnect failed ({e})");
1294 let next = (delay_ms * 2).min(RECONNECT_MAX_SECS * 1_000);
1295 delay_ms = jitter_delay(next).as_millis() as u64;
1296 }
1297 }
1298 };
1299
1300 let (new_rh, new_fk, new_ak, new_sid) = new_conn;
1301 rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
1302
1303 let (init_tx, init_rx) = oneshot::channel();
1306 let c = self.clone();
1307 let utx = self.inner.update_tx.clone();
1308 tokio::spawn(async move {
1309 let result = loop {
1311 match c.init_connection().await {
1312 Ok(()) => break Ok(()),
1313 Err(InvocationError::Rpc(ref r))
1314 if r.flood_wait_seconds().is_some() =>
1315 {
1316 let secs = r.flood_wait_seconds().unwrap();
1317 log::warn!(
1318 "[layer] Supervisor init_connection FLOOD_WAIT_{secs} — waiting"
1319 );
1320 sleep(Duration::from_secs(secs + 1)).await;
1321 }
1322 Err(e) => break Err(e),
1323 }
1324 };
1325 if result.is_ok()
1326 && let Ok(missed) = c.get_difference().await {
1327 for u in missed { let _ = utx.send(u); }
1328 }
1329 let _ = init_tx.send(result);
1330 });
1331 restart_init_rx = Some(init_rx);
1332
1333 log::info!("[layer] Supervisor: restarting reader loop (restart #{restart_count}) …");
1334 }
1336 }
1337
1338 #[allow(clippy::too_many_arguments)]
1339 async fn reader_loop(
1340 &self,
1341 mut rh: OwnedReadHalf,
1342 mut fk: FrameKind,
1343 mut ak: [u8; 256],
1344 mut sid: i64,
1345 initial_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>>,
1348 new_conn_rx: &mut mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1349 network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
1350 ) {
1351 let mut init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = initial_init_rx;
1356 let mut init_fail_count: u32 = 0;
1361
1362 loop {
1363 tokio::select! {
1364 outcome = recv_frame_with_keepalive(&mut rh, &fk, self, &ak) => {
1366 match outcome {
1367 FrameOutcome::Frame(mut raw) => {
1368 let msg = match EncryptedSession::decrypt_frame(&ak, sid, &mut raw) {
1369 Ok(m) => m,
1370 Err(e) => { log::warn!("[layer] Decrypt error: {e:?}"); continue; }
1371 };
1372 if msg.salt != 0 {
1373 self.inner.writer.lock().await.enc.salt = msg.salt;
1374 }
1375 self.route_frame(msg.body, msg.msg_id).await;
1376
1377 }
1382
1383 FrameOutcome::Error(e) => {
1384 log::warn!("[layer] Reader: connection error: {e}");
1385 drop(init_rx.take()); {
1390 let mut pending = self.inner.pending.lock().await;
1391 let msg = e.to_string();
1392 for (_, tx) in pending.drain() {
1393 let _ = tx.send(Err(InvocationError::Io(
1394 std::io::Error::new(
1395 std::io::ErrorKind::ConnectionReset, msg.clone()))));
1396 }
1397 }
1398 self.inner.writer.lock().await.sent_bodies.clear();
1400
1401 match self.do_reconnect_loop(
1402 RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
1403 network_hint_rx,
1404 ).await {
1405 Some(rx) => { init_rx = Some(rx); }
1406 None => return, }
1408 }
1409
1410 FrameOutcome::Keepalive => {} }
1412 }
1413
1414 maybe = new_conn_rx.recv() => {
1416 if let Some((new_rh, new_fk, new_ak, new_sid)) = maybe {
1417 rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
1418 log::info!("[layer] Reader: switched to new connection.");
1419 } else {
1420 break; }
1422 }
1423
1424
1425 init_result = async { init_rx.as_mut().unwrap().await }, if init_rx.is_some() => {
1427 init_rx = None;
1428 match init_result {
1429 Ok(Ok(())) => {
1430 init_fail_count = 0;
1431 log::info!("[layer] Reconnected to Telegram ✓ — session live, replaying missed updates …");
1436 for attempt in 1u8..=3 {
1437 match self.save_session().await {
1438 Ok(()) => break,
1439 Err(e) if attempt < 3 => {
1440 log::warn!(
1441 "[layer] save_session failed (attempt {attempt}/3): {e}"
1442 );
1443 sleep(Duration::from_millis(500)).await;
1444 }
1445 Err(e) => {
1446 log::error!(
1447 "[layer] save_session permanently failed after 3 attempts: {e}"
1448 );
1449 }
1450 }
1451 }
1452 }
1453
1454 Ok(Err(e)) => {
1455 let key_is_stale = match &e {
1464 InvocationError::Rpc(r) if r.code == -404 => true,
1466 InvocationError::Io(io) if io.kind() == std::io::ErrorKind::UnexpectedEof
1468 || io.kind() == std::io::ErrorKind::ConnectionReset => true,
1469 _ => false,
1471 };
1472
1473 if key_is_stale {
1474 log::warn!(
1475 "[layer] init_connection failed with definitive bad-key signal ({e}) \
1476 — clearing auth key for fresh DH …"
1477 );
1478 init_fail_count = 0;
1479 let home_dc_id = *self.inner.home_dc_id.lock().await;
1480 let mut opts = self.inner.dc_options.lock().await;
1481 if let Some(entry) = opts.get_mut(&home_dc_id) {
1482 entry.auth_key = None;
1483 }
1484 } else {
1485 init_fail_count += 1;
1486 log::warn!(
1487 "[layer] init_connection failed transiently (attempt {init_fail_count}, {e}) \
1488 — retrying with same key …"
1489 );
1490 }
1491 {
1492 let mut pending = self.inner.pending.lock().await;
1493 let msg = e.to_string();
1494 for (_, tx) in pending.drain() {
1495 let _ = tx.send(Err(InvocationError::Io(
1496 std::io::Error::new(
1497 std::io::ErrorKind::ConnectionReset, msg.clone()))));
1498 }
1499 }
1500 match self.do_reconnect_loop(
1501 0, &mut rh, &mut fk, &mut ak, &mut sid, network_hint_rx,
1502 ).await {
1503 Some(rx) => { init_rx = Some(rx); }
1504 None => return,
1505 }
1506 }
1507
1508 Err(_) => {
1509 log::warn!("[layer] init_connection task dropped unexpectedly, reconnecting …");
1511 match self.do_reconnect_loop(
1512 RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
1513 network_hint_rx,
1514 ).await {
1515 Some(rx) => { init_rx = Some(rx); }
1516 None => return,
1517 }
1518 }
1519 }
1520 }
1521 }
1522 }
1523 }
1524
1525 async fn route_frame(&self, body: Vec<u8>, msg_id: i64) {
1527 if body.len() < 4 { return; }
1528 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
1529
1530 match cid {
1531 ID_RPC_RESULT => {
1532 if body.len() < 12 { return; }
1533 let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1534 let inner = body[12..].to_vec();
1535 self.inner.writer.lock().await.pending_ack.push(msg_id);
1537 let result = unwrap_envelope(inner);
1538 if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
1539 self.inner.writer.lock().await.sent_bodies.remove(&req_msg_id);
1541 let to_send = match result {
1542 Ok(EnvelopeResult::Payload(p)) => Ok(p),
1543 Ok(EnvelopeResult::Updates(us)) => {
1544 for u in us { let _ = self.inner.update_tx.send(u); }
1545 Ok(vec![])
1546 }
1547 Ok(EnvelopeResult::None) => Ok(vec![]),
1548 Err(e) => Err(e),
1549 };
1550 let _ = tx.send(to_send);
1551 }
1552 }
1553 ID_RPC_ERROR => {
1554 log::warn!("[layer] Unexpected top-level rpc_error (no pending target)");
1555 }
1556 ID_MSG_CONTAINER => {
1557 if body.len() < 8 { return; }
1558 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
1559 let mut pos = 8usize;
1560 for _ in 0..count {
1561 if pos + 16 > body.len() { break; }
1562 let inner_msg_id = i64::from_le_bytes(body[pos..pos + 8].try_into().unwrap());
1564 let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
1565 pos += 16;
1566 if pos + inner_len > body.len() { break; }
1567 let inner = body[pos..pos + inner_len].to_vec();
1568 pos += inner_len;
1569 Box::pin(self.route_frame(inner, inner_msg_id)).await;
1570 }
1571 }
1572 ID_GZIP_PACKED => {
1573 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
1574 if let Ok(inflated) = gz_inflate(&bytes) {
1575 Box::pin(self.route_frame(inflated, msg_id)).await;
1577 }
1578 }
1579 ID_BAD_SERVER_SALT => {
1580 if body.len() >= 24 {
1586 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1587 let new_salt = i64::from_le_bytes(body[16..24].try_into().unwrap());
1588
1589 self.inner.writer.lock().await.enc.salt = new_salt;
1591 log::debug!("[layer] bad_server_salt: bad_msg_id={bad_msg_id} salt={new_salt:#x}");
1592
1593 {
1595 let mut w = self.inner.writer.lock().await;
1596 if let Some(orig_body) = w.sent_bodies.remove(&bad_msg_id) {
1597 let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
1598 let fk = w.frame_kind.clone();
1599 w.sent_bodies.insert(new_msg_id, orig_body);
1600 drop(w);
1602 let mut pending = self.inner.pending.lock().await;
1603 if let Some(tx) = pending.remove(&bad_msg_id) {
1604 pending.insert(new_msg_id, tx);
1605 drop(pending);
1606 let mut w = self.inner.writer.lock().await;
1607 if let Err(e) = send_frame_write(&mut w.write_half, &wire, &fk).await {
1608 log::warn!("[layer] bad_server_salt re-send failed: {e}");
1609 } else {
1610 log::debug!("[layer] bad_server_salt re-sent {bad_msg_id}→{new_msg_id}");
1611 }
1612 }
1613 }
1614 }
1615
1616 let inner = Arc::clone(&self.inner);
1618 tokio::spawn(async move {
1619 log::debug!("[layer] G-08 proactive GetFutureSalts …");
1620 let mut req_body = Vec::with_capacity(8);
1622 req_body.extend_from_slice(&0xb921bd04_u32.to_le_bytes());
1623 req_body.extend_from_slice(&64_i32.to_le_bytes());
1624 let (wire, fs_msg_id) = {
1626 let mut w = inner.writer.lock().await;
1627 let (wire, id) = w.enc.pack_body_with_msg_id(&req_body, true);
1628 w.sent_bodies.insert(id, req_body);
1629 (wire, id)
1630 };
1631 let fk = inner.writer.lock().await.frame_kind.clone();
1632 let (tx, rx) = tokio::sync::oneshot::channel();
1633 inner.pending.lock().await.insert(fs_msg_id, tx);
1634 {
1635 let mut w = inner.writer.lock().await;
1636 if send_frame_write(&mut w.write_half, &wire, &fk).await.is_err() {
1637 inner.pending.lock().await.remove(&fs_msg_id);
1638 inner.writer.lock().await.sent_bodies.remove(&fs_msg_id);
1639 return;
1640 }
1641 }
1642 let _ = tokio::time::timeout(
1643 std::time::Duration::from_secs(30), rx
1644 ).await;
1645 });
1646 }
1647 }
1648 ID_PONG => {
1649 if body.len() >= 20 {
1653 let ping_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1654 if let Some(tx) = self.inner.pending.lock().await.remove(&ping_msg_id) {
1655 self.inner.writer.lock().await.sent_bodies.remove(&ping_msg_id);
1656 let _ = tx.send(Ok(body));
1657 }
1658 }
1659 }
1660 ID_FUTURE_SALTS => {
1662 if body.len() >= 12 {
1674 let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1675 if body.len() >= 40 {
1676 let count = u32::from_le_bytes(body[20..24].try_into().unwrap()) as usize;
1677 if count > 0 {
1678 let salt_val = i64::from_le_bytes(body[32..40].try_into().unwrap());
1679 self.inner.writer.lock().await.enc.salt = salt_val;
1680 log::debug!("[layer] G-09 FutureSalts: salt={salt_val:#x} count={count}");
1681 }
1682 }
1683 if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
1684 self.inner.writer.lock().await.sent_bodies.remove(&req_msg_id);
1685 let _ = tx.send(Ok(body));
1686 }
1687 }
1688 }
1689 ID_NEW_SESSION => {
1690 if body.len() >= 28 {
1692 let server_salt = i64::from_le_bytes(body[20..28].try_into().unwrap());
1693 self.inner.writer.lock().await.enc.salt = server_salt;
1694 log::debug!("[layer] new_session_created — salt reset to {server_salt:#x}");
1695 }
1696 }
1697 ID_BAD_MSG_NOTIFY => {
1699 if body.len() < 20 { return; }
1701 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1702 let error_code = u32::from_le_bytes(body[16..20].try_into().unwrap());
1703 log::warn!("[layer] bad_msg_notification: msg_id={bad_msg_id} code={error_code}");
1704
1705 let resend: Option<(Vec<u8>, i64, FrameKind)> = {
1709 let mut w = self.inner.writer.lock().await;
1710 if error_code == 16 || error_code == 17 {
1712 w.enc.correct_time_offset(bad_msg_id);
1713 }
1714 if error_code == 32 || error_code == 33 {
1716 w.enc.correct_seq_no(error_code);
1717 }
1718 if let Some(orig_body) = w.sent_bodies.remove(&bad_msg_id) {
1720 let (wire, new_msg_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
1721 let fk = w.frame_kind.clone();
1722 w.sent_bodies.insert(new_msg_id, orig_body);
1723 Some((wire, new_msg_id, fk))
1724 } else {
1725 None
1726 }
1727 }; match resend {
1730 Some((wire, new_msg_id, fk)) => {
1731 let has_waiter = {
1733 let mut pending = self.inner.pending.lock().await;
1734 if let Some(tx) = pending.remove(&bad_msg_id) {
1735 pending.insert(new_msg_id, tx);
1736 true
1737 } else {
1738 false
1739 }
1740 };
1741 if has_waiter {
1742 let mut w = self.inner.writer.lock().await;
1744 if let Err(e) = send_frame_write(&mut w.write_half, &wire, &fk).await {
1745 log::warn!("[layer] G-02 re-send failed: {e}");
1746 w.sent_bodies.remove(&new_msg_id);
1747 } else {
1748 log::debug!("[layer] G-02 re-sent {bad_msg_id}→{new_msg_id}");
1749 }
1750 } else {
1751 self.inner.writer.lock().await.sent_bodies.remove(&new_msg_id);
1752 }
1753 }
1754 None => {
1755 if let Some(tx) = self.inner.pending.lock().await.remove(&bad_msg_id) {
1757 let _ = tx.send(Err(InvocationError::Deserialize(
1758 format!("bad_msg_notification code={error_code}")
1759 )));
1760 }
1761 }
1762 }
1763 }
1764 ID_MSG_DETAILED_INFO => {
1766 if body.len() >= 20 {
1770 let answer_msg_id = i64::from_le_bytes(body[12..20].try_into().unwrap());
1771 self.inner.writer.lock().await.pending_ack.push(answer_msg_id);
1772 log::trace!("[layer] G-11 MsgDetailedInfo: queued ack for answer_msg_id={answer_msg_id}");
1773 }
1774 }
1775 ID_MSG_NEW_DETAIL_INFO => {
1776 if body.len() >= 12 {
1779 let answer_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1780 self.inner.writer.lock().await.pending_ack.push(answer_msg_id);
1781 log::trace!("[layer] G-11 MsgNewDetailedInfo: queued ack for {answer_msg_id}");
1782 }
1783 }
1784 ID_MSG_RESEND_REQ => {
1786 if body.len() >= 12 {
1791 let count = u32::from_le_bytes(body[8..12].try_into().unwrap()) as usize;
1792 let mut w = self.inner.writer.lock().await;
1793 let fk = w.frame_kind.clone();
1794 for i in 0..count {
1795 let off = 12 + i * 8;
1796 if off + 8 > body.len() { break; }
1797 let resend_id = i64::from_le_bytes(body[off..off + 8].try_into().unwrap());
1798 if let Some(orig_body) = w.sent_bodies.remove(&resend_id) {
1799 let (wire, new_id) = w.enc.pack_body_with_msg_id(&orig_body, true);
1800 let mut pending = self.inner.pending.lock().await;
1802 if let Some(tx) = pending.remove(&resend_id) {
1803 pending.insert(new_id, tx);
1804 }
1805 drop(pending);
1806 w.sent_bodies.insert(new_id, orig_body);
1807 send_frame_write(&mut w.write_half, &wire, &fk).await.ok();
1808 log::debug!("[layer] G-14 MsgResendReq: resent {resend_id} → {new_id}");
1809 }
1810 }
1811 }
1812 }
1813 0xe22045fc => {
1815 log::warn!("[layer] destroy_session_ok received — session terminated by server");
1816 }
1817 0x62d350c9 => {
1818 log::warn!("[layer] destroy_session_none received — session was already gone");
1819 }
1820 ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
1821 | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
1822 | ID_UPDATES_TOO_LONG => {
1823 self.inner.writer.lock().await.pending_ack.push(msg_id);
1825 self.dispatch_updates(&body).await;
1827 }
1828 _ => {}
1829 }
1830 }
1831
1832
1833 async fn dispatch_updates(&self, body: &[u8]) {
1838 if body.len() < 4 { return; }
1839 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
1840
1841 if cid == 0xe317af7e_u32 {
1843 log::warn!("[layer] updatesTooLong — getDifference");
1844 let c = self.clone();
1845 let utx = self.inner.update_tx.clone();
1846 tokio::spawn(async move {
1847 match c.get_difference().await {
1848 Ok(updates) => { for u in updates { let _ = utx.send(u); } }
1849 Err(e) => log::warn!("[layer] getDifference after updatesTooLong: {e}"),
1850 }
1851 });
1852 return;
1853 }
1854
1855 if cid == 0x313bc7f8 || cid == 0x4d6deea5 {
1858 for u in update::parse_updates(body) {
1862 let _ = self.inner.update_tx.send(u);
1863 }
1864 return;
1865 }
1866
1867 {
1869 use layer_tl_types::{Cursor, Deserializable};
1870 let mut cur = Cursor::from_slice(body);
1871 let seq_info: Option<(i32, i32)> = match cid {
1872 0x74ae4240 => {
1873 match tl::enums::Updates::deserialize(&mut cur) {
1875 Ok(tl::enums::Updates::Updates(u)) => Some((u.seq, u.seq)),
1876 _ => None,
1877 }
1878 }
1879 0x725b04c3 => {
1880 match tl::enums::Updates::deserialize(&mut cur) {
1882 Ok(tl::enums::Updates::Combined(u)) => Some((u.seq, u.seq_start)),
1883 _ => None,
1884 }
1885 }
1886 _ => None,
1887 };
1888 if let Some((seq, seq_start)) = seq_info
1889 && seq != 0 {
1890 let c = self.clone();
1891 let utx = self.inner.update_tx.clone();
1892 tokio::spawn(async move {
1893 match c.check_and_fill_seq_gap(seq, seq_start).await {
1894 Ok(extra) => { for u in extra { let _ = utx.send(u); } }
1895 Err(e) => log::warn!("[layer] seq gap fill: {e}"),
1896 }
1897 });
1898 }
1899 }
1900
1901 use layer_tl_types::{Cursor, Deserializable};
1903 let mut cur = Cursor::from_slice(body);
1904 let raw: Vec<tl::enums::Update> = match cid {
1905 0x78d4dec1 => { match tl::types::UpdateShort::deserialize(&mut cur) {
1907 Ok(u) => vec![u.update],
1908 Err(_) => vec![],
1909 }
1910 }
1911 0x74ae4240 => { match tl::enums::Updates::deserialize(&mut cur) {
1913 Ok(tl::enums::Updates::Updates(u)) => u.updates,
1914 _ => vec![],
1915 }
1916 }
1917 0x725b04c3 => { match tl::enums::Updates::deserialize(&mut cur) {
1919 Ok(tl::enums::Updates::Combined(u)) => u.updates,
1920 _ => vec![],
1921 }
1922 }
1923 _ => vec![],
1924 };
1925
1926 for upd in raw {
1927 self.dispatch_single_update(upd).await;
1928 }
1929 }
1930
1931 async fn dispatch_single_update(&self, upd: tl::enums::Update) {
1934 enum Kind {
1937 GlobalPts { pts: i32, pts_count: i32, carry: bool },
1938 ChannelPts { channel_id: i64, pts: i32, pts_count: i32, carry: bool },
1939 Qts { qts: i32 },
1940 Passthrough,
1941 }
1942
1943 fn ch_from_msg(msg: &tl::enums::Message) -> i64 {
1944 if let tl::enums::Message::Message(m) = msg
1945 && let tl::enums::Peer::Channel(c) = &m.peer_id { return c.channel_id; }
1946 0
1947 }
1948
1949 let kind = {
1950 use tl::enums::Update::*;
1951 match &upd {
1952 NewMessage(u) => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: true },
1953 EditMessage(u) => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: true },
1954 DeleteMessages(u) => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: true },
1955 ReadHistoryInbox(u) => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: false },
1956 ReadHistoryOutbox(u) => Kind::GlobalPts { pts: u.pts, pts_count: u.pts_count, carry: false },
1957 NewChannelMessage(u) => Kind::ChannelPts { channel_id: ch_from_msg(&u.message), pts: u.pts, pts_count: u.pts_count, carry: true },
1958 EditChannelMessage(u) => Kind::ChannelPts { channel_id: ch_from_msg(&u.message), pts: u.pts, pts_count: u.pts_count, carry: true },
1959 DeleteChannelMessages(u) => Kind::ChannelPts { channel_id: u.channel_id, pts: u.pts, pts_count: u.pts_count, carry: true },
1960 NewEncryptedMessage(u) => Kind::Qts { qts: u.qts },
1961 _ => Kind::Passthrough,
1962 }
1963 };
1964
1965 let high = update::from_single_update_pub(upd);
1966
1967 let to_send: Vec<update::Update> = match kind {
1968 Kind::GlobalPts { pts, pts_count, carry } => {
1969 let first = if carry { high.into_iter().next() } else { None };
1970 let c = self.clone();
1974 let utx = self.inner.update_tx.clone();
1975 tokio::spawn(async move {
1976 match c.check_and_fill_gap(pts, pts_count, first).await {
1977 Ok(v) => { for u in v { let _ = utx.send(u); } }
1978 Err(e) => log::warn!("[layer] pts gap: {e}"),
1979 }
1980 });
1981 vec![]
1982 }
1983 Kind::ChannelPts { channel_id, pts, pts_count, carry } => {
1984 let first = if carry { high.into_iter().next() } else { None };
1985 if channel_id != 0 {
1986 let c = self.clone();
1988 let utx = self.inner.update_tx.clone();
1989 tokio::spawn(async move {
1990 match c.check_and_fill_channel_gap(channel_id, pts, pts_count, first).await {
1991 Ok(v) => { for u in v { let _ = utx.send(u); } }
1992 Err(e) => log::warn!("[layer] ch pts gap: {e}"),
1993 }
1994 });
1995 vec![]
1996 } else {
1997 first.into_iter().collect()
1998 }
1999 }
2000 Kind::Qts { qts } => {
2001 let c = self.clone();
2003 tokio::spawn(async move {
2004 if let Err(e) = c.check_and_fill_qts_gap(qts, 1).await {
2005 log::warn!("[layer] qts gap: {e}");
2006 }
2007 });
2008 vec![]
2009 }
2010 Kind::Passthrough => high,
2011 };
2012
2013 for u in to_send {
2014 let _ = self.inner.update_tx.send(u);
2015 }
2016 }
2017
2018 async fn do_reconnect_loop(
2028 &self,
2029 initial_delay_ms: u64,
2030 rh: &mut OwnedReadHalf,
2031 fk: &mut FrameKind,
2032 ak: &mut [u8; 256],
2033 sid: &mut i64,
2034 network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
2035 ) -> Option<oneshot::Receiver<Result<(), InvocationError>>> {
2036 let mut delay_ms = if initial_delay_ms == 0 {
2037 0
2040 } else {
2041 initial_delay_ms.max(RECONNECT_BASE_MS)
2042 };
2043 loop {
2044 log::info!("[layer] Reconnecting in {delay_ms} ms …");
2045 tokio::select! {
2046 _ = sleep(Duration::from_millis(delay_ms)) => {}
2047 hint = network_hint_rx.recv() => {
2048 hint?; log::info!("[layer] Network hint → skipping backoff, reconnecting now");
2050 }
2051 }
2052
2053 match self.do_reconnect(ak, fk).await {
2054 Ok((new_rh, new_fk, new_ak, new_sid)) => {
2055 *rh = new_rh; *fk = new_fk; *ak = new_ak; *sid = new_sid;
2056 log::info!("[layer] TCP reconnected ✓ — initialising session …");
2057
2058 let (init_tx, init_rx) = oneshot::channel();
2062 let c = self.clone();
2063 let utx = self.inner.update_tx.clone();
2064 tokio::spawn(async move {
2065 let result = loop {
2070 match c.init_connection().await {
2071 Ok(()) => break Ok(()),
2072 Err(InvocationError::Rpc(ref r))
2073 if r.flood_wait_seconds().is_some() =>
2074 {
2075 let secs = r.flood_wait_seconds().unwrap();
2076 log::warn!(
2077 "[layer] init_connection FLOOD_WAIT_{secs} — waiting before retry"
2078 );
2079 sleep(Duration::from_secs(secs + 1)).await;
2080 }
2082 Err(e) => break Err(e),
2083 }
2084 };
2085 if result.is_ok() {
2086 if let Ok(missed) = c.get_difference().await {
2088 for u in missed { let _ = utx.send(u); }
2089 }
2090 }
2091 let _ = init_tx.send(result);
2092 });
2093 return Some(init_rx);
2094 }
2095 Err(e) => {
2096 log::warn!("[layer] Reconnect attempt failed: {e}");
2097 let next = delay_ms.saturating_mul(2).clamp(RECONNECT_BASE_MS, RECONNECT_MAX_SECS * 1_000);
2101 delay_ms = jitter_delay(next).as_millis() as u64;
2102 }
2103 }
2104 }
2105 }
2106
2107 async fn do_reconnect(
2109 &self,
2110 _old_auth_key: &[u8; 256],
2111 _old_frame_kind: &FrameKind,
2112 ) -> Result<(OwnedReadHalf, FrameKind, [u8; 256], i64), InvocationError> {
2113 let home_dc_id = *self.inner.home_dc_id.lock().await;
2114 let (addr, saved_key, first_salt, time_offset) = {
2115 let opts = self.inner.dc_options.lock().await;
2116 match opts.get(&home_dc_id) {
2117 Some(e) => (e.addr.clone(), e.auth_key, e.first_salt, e.time_offset),
2118 None => ("149.154.167.51:443".to_string(), None, 0, 0),
2119 }
2120 };
2121 let socks5 = self.inner.socks5.clone();
2122 let transport = self.inner.transport.clone();
2123
2124 let new_conn = if let Some(key) = saved_key {
2125 log::info!("[layer] Reconnecting to DC{home_dc_id} with saved key …");
2126 match Connection::connect_with_key(
2127 &addr, key, first_salt, time_offset, socks5.as_ref(), &transport,
2128 ).await {
2129 Ok(c) => c,
2130 Err(e2) => {
2131 log::warn!("[layer] connect_with_key failed ({e2}), fresh DH …");
2132 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
2133 }
2134 }
2135 } else {
2136 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
2137 };
2138
2139 let (new_writer, new_read, new_fk) = new_conn.into_writer();
2140 let new_ak = new_writer.enc.auth_key_bytes();
2141 let new_sid = new_writer.enc.session_id();
2142 *self.inner.writer.lock().await = new_writer;
2143
2144 Ok((new_read, new_fk, new_ak, new_sid))
2156 }
2157
2158 pub async fn send_message(&self, peer: &str, text: &str) -> Result<(), InvocationError> {
2162 let p = self.resolve_peer(peer).await?;
2163 self.send_message_to_peer(p, text).await
2164 }
2165
2166 pub async fn send_message_to_peer(
2168 &self,
2169 peer: tl::enums::Peer,
2170 text: &str,
2171 ) -> Result<(), InvocationError> {
2172 self.send_message_to_peer_ex(peer, &InputMessage::text(text)).await
2173 }
2174
2175 pub async fn send_message_to_peer_ex(
2177 &self,
2178 peer: tl::enums::Peer,
2179 msg: &InputMessage,
2180 ) -> Result<(), InvocationError> {
2181 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2182 let schedule = if msg.schedule_once_online {
2183 Some(0x7FFF_FFFEi32)
2184 } else {
2185 msg.schedule_date
2186 };
2187
2188 if let Some(media) = &msg.media {
2190 let req = tl::functions::messages::SendMedia {
2191 silent: msg.silent,
2192 background: msg.background,
2193 clear_draft: msg.clear_draft,
2194 noforwards: false,
2195 update_stickersets_order: false,
2196 invert_media: msg.invert_media,
2197 allow_paid_floodskip: false,
2198 peer: input_peer,
2199 reply_to: msg.reply_header(),
2200 media: media.clone(),
2201 message: msg.text.clone(),
2202 random_id: random_i64(),
2203 reply_markup: msg.reply_markup.clone(),
2204 entities: msg.entities.clone(),
2205 schedule_date: schedule,
2206 schedule_repeat_period: None,
2207 send_as: None,
2208 quick_reply_shortcut: None,
2209 effect: None,
2210 allow_paid_stars: None,
2211 suggested_post: None,
2212 };
2213 return self.rpc_call_raw_pub(&req).await.map(|_| ());
2214 }
2215
2216 let req = tl::functions::messages::SendMessage {
2217 no_webpage: msg.no_webpage,
2218 silent: msg.silent,
2219 background: msg.background,
2220 clear_draft: msg.clear_draft,
2221 noforwards: false,
2222 update_stickersets_order: false,
2223 invert_media: msg.invert_media,
2224 allow_paid_floodskip: false,
2225 peer: input_peer,
2226 reply_to: msg.reply_header(),
2227 message: msg.text.clone(),
2228 random_id: random_i64(),
2229 reply_markup: msg.reply_markup.clone(),
2230 entities: msg.entities.clone(),
2231 schedule_date: schedule,
2232 schedule_repeat_period: None,
2233 send_as: None,
2234 quick_reply_shortcut: None,
2235 effect: None,
2236 allow_paid_stars: None,
2237 suggested_post: None,
2238 };
2239 self.rpc_write(&req).await
2240 }
2241
2242 pub async fn send_to_self(&self, text: &str) -> Result<(), InvocationError> {
2244 let req = tl::functions::messages::SendMessage {
2245 no_webpage: false,
2246 silent: false,
2247 background: false,
2248 clear_draft: false,
2249 noforwards: false,
2250 update_stickersets_order: false,
2251 invert_media: false,
2252 allow_paid_floodskip: false,
2253 peer: tl::enums::InputPeer::PeerSelf,
2254 reply_to: None,
2255 message: text.to_string(),
2256 random_id: random_i64(),
2257 reply_markup: None,
2258 entities: None,
2259 schedule_date: None,
2260 schedule_repeat_period: None,
2261 send_as: None,
2262 quick_reply_shortcut: None,
2263 effect: None,
2264 allow_paid_stars: None,
2265 suggested_post: None,
2266 };
2267 self.rpc_write(&req).await
2268 }
2269
2270 pub async fn edit_message(
2272 &self,
2273 peer: tl::enums::Peer,
2274 message_id: i32,
2275 new_text: &str,
2276 ) -> Result<(), InvocationError> {
2277 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2278 let req = tl::functions::messages::EditMessage {
2279 no_webpage: false,
2280 invert_media: false,
2281 peer: input_peer,
2282 id: message_id,
2283 message: Some(new_text.to_string()),
2284 media: None,
2285 reply_markup: None,
2286 entities: None,
2287 schedule_date: None,
2288 schedule_repeat_period: None,
2289 quick_reply_shortcut_id: None,
2290 };
2291 self.rpc_write(&req).await
2292 }
2293
2294 pub async fn forward_messages(
2296 &self,
2297 destination: tl::enums::Peer,
2298 message_ids: &[i32],
2299 source: tl::enums::Peer,
2300 ) -> Result<(), InvocationError> {
2301 let cache = self.inner.peer_cache.read().await;
2302 let to_peer = cache.peer_to_input(&destination);
2303 let from_peer = cache.peer_to_input(&source);
2304 drop(cache);
2305
2306 let req = tl::functions::messages::ForwardMessages {
2307 silent: false,
2308 background: false,
2309 with_my_score: false,
2310 drop_author: false,
2311 drop_media_captions: false,
2312 noforwards: false,
2313 from_peer,
2314 id: message_ids.to_vec(),
2315 random_id: (0..message_ids.len()).map(|_| random_i64()).collect(),
2316 to_peer,
2317 top_msg_id: None,
2318 reply_to: None,
2319 schedule_date: None,
2320 schedule_repeat_period: None,
2321 send_as: None,
2322 quick_reply_shortcut: None,
2323 effect: None,
2324 video_timestamp: None,
2325 allow_paid_stars: None,
2326 allow_paid_floodskip: false,
2327 suggested_post: None,
2328 };
2329 self.rpc_write(&req).await
2330 }
2331
2332 pub async fn delete_messages(&self, message_ids: Vec<i32>, revoke: bool) -> Result<(), InvocationError> {
2334 let req = tl::functions::messages::DeleteMessages { revoke, id: message_ids };
2335 self.rpc_write(&req).await
2336 }
2337
2338 pub async fn get_messages_by_id(
2340 &self,
2341 peer: tl::enums::Peer,
2342 ids: &[i32],
2343 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2344 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2345 let id_list: Vec<tl::enums::InputMessage> = ids.iter()
2346 .map(|&id| tl::enums::InputMessage::Id(tl::types::InputMessageId { id }))
2347 .collect();
2348 let req = tl::functions::channels::GetMessages {
2349 channel: match &input_peer {
2350 tl::enums::InputPeer::Channel(c) =>
2351 tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2352 channel_id: c.channel_id, access_hash: c.access_hash
2353 }),
2354 _ => return self.get_messages_user(input_peer, id_list).await,
2355 },
2356 id: id_list,
2357 };
2358 let body = self.rpc_call_raw(&req).await?;
2359 let mut cur = Cursor::from_slice(&body);
2360 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2361 tl::enums::messages::Messages::Messages(m) => m.messages,
2362 tl::enums::messages::Messages::Slice(m) => m.messages,
2363 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2364 tl::enums::messages::Messages::NotModified(_) => vec![],
2365 };
2366 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2367 }
2368
2369 async fn get_messages_user(
2370 &self,
2371 _peer: tl::enums::InputPeer,
2372 ids: Vec<tl::enums::InputMessage>,
2373 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2374 let req = tl::functions::messages::GetMessages { id: ids };
2375 let body = self.rpc_call_raw(&req).await?;
2376 let mut cur = Cursor::from_slice(&body);
2377 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2378 tl::enums::messages::Messages::Messages(m) => m.messages,
2379 tl::enums::messages::Messages::Slice(m) => m.messages,
2380 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2381 tl::enums::messages::Messages::NotModified(_) => vec![],
2382 };
2383 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2384 }
2385
2386 pub async fn get_pinned_message(
2388 &self,
2389 peer: tl::enums::Peer,
2390 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
2391 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2392 let req = tl::functions::messages::Search {
2393 peer: input_peer,
2394 q: String::new(),
2395 from_id: None,
2396 saved_peer_id: None,
2397 saved_reaction: None,
2398 top_msg_id: None,
2399 filter: tl::enums::MessagesFilter::InputMessagesFilterPinned,
2400 min_date: 0,
2401 max_date: 0,
2402 offset_id: 0,
2403 add_offset: 0,
2404 limit: 1,
2405 max_id: 0,
2406 min_id: 0,
2407 hash: 0,
2408 };
2409 let body = self.rpc_call_raw(&req).await?;
2410 let mut cur = Cursor::from_slice(&body);
2411 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2412 tl::enums::messages::Messages::Messages(m) => m.messages,
2413 tl::enums::messages::Messages::Slice(m) => m.messages,
2414 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2415 tl::enums::messages::Messages::NotModified(_) => vec![],
2416 };
2417 Ok(msgs.into_iter().next().map(update::IncomingMessage::from_raw))
2418 }
2419
2420 pub async fn pin_message(
2422 &self,
2423 peer: tl::enums::Peer,
2424 message_id: i32,
2425 silent: bool,
2426 unpin: bool,
2427 pm_oneside: bool,
2428 ) -> Result<(), InvocationError> {
2429 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2430 let req = tl::functions::messages::UpdatePinnedMessage {
2431 silent,
2432 unpin,
2433 pm_oneside,
2434 peer: input_peer,
2435 id: message_id,
2436 };
2437 self.rpc_write(&req).await
2438 }
2439
2440 pub async fn unpin_message(
2442 &self,
2443 peer: tl::enums::Peer,
2444 message_id: i32,
2445 ) -> Result<(), InvocationError> {
2446 self.pin_message(peer, message_id, true, true, false).await
2447 }
2448
2449 pub async fn unpin_all_messages(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2451 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2452 let req = tl::functions::messages::UnpinAllMessages {
2453 peer: input_peer,
2454 top_msg_id: None,
2455 saved_peer_id: None,
2456 };
2457 self.rpc_write(&req).await
2458 }
2459
2460 pub async fn search_messages(
2465 &self,
2466 peer: tl::enums::Peer,
2467 query: &str,
2468 limit: i32,
2469 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2470 self.search(peer, query).limit(limit).fetch(self).await
2471 }
2472
2473 pub fn search(&self, peer: tl::enums::Peer, query: &str) -> SearchBuilder {
2475 SearchBuilder::new(peer, query.to_string())
2476 }
2477
2478 pub async fn search_global(
2480 &self,
2481 query: &str,
2482 limit: i32,
2483 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2484 self.search_global_builder(query).limit(limit).fetch(self).await
2485 }
2486
2487 pub fn search_global_builder(&self, query: &str) -> GlobalSearchBuilder {
2489 GlobalSearchBuilder::new(query.to_string())
2490 }
2491
2492 pub async fn get_scheduled_messages(
2509 &self,
2510 peer: tl::enums::Peer,
2511 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2512 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2513 let req = tl::functions::messages::GetScheduledHistory {
2514 peer: input_peer,
2515 hash: 0,
2516 };
2517 let body = self.rpc_call_raw(&req).await?;
2518 let mut cur = Cursor::from_slice(&body);
2519 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2520 tl::enums::messages::Messages::Messages(m) => m.messages,
2521 tl::enums::messages::Messages::Slice(m) => m.messages,
2522 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2523 tl::enums::messages::Messages::NotModified(_) => vec![],
2524 };
2525 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2526 }
2527
2528 pub async fn delete_scheduled_messages(
2530 &self,
2531 peer: tl::enums::Peer,
2532 ids: Vec<i32>,
2533 ) -> Result<(), InvocationError> {
2534 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2535 let req = tl::functions::messages::DeleteScheduledMessages {
2536 peer: input_peer,
2537 id: ids,
2538 };
2539 self.rpc_write(&req).await
2540 }
2541
2542 pub async fn edit_inline_message(
2560 &self,
2561 id: tl::enums::InputBotInlineMessageId,
2562 new_text: &str,
2563 reply_markup: Option<tl::enums::ReplyMarkup>,
2564 ) -> Result<bool, InvocationError> {
2565 let req = tl::functions::messages::EditInlineBotMessage {
2566 no_webpage: false,
2567 invert_media: false,
2568 id,
2569 message: Some(new_text.to_string()),
2570 media: None,
2571 reply_markup,
2572 entities: None,
2573 };
2574 let body = self.rpc_call_raw(&req).await?;
2575 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
2577 }
2578
2579 pub async fn answer_callback_query(
2581 &self,
2582 query_id: i64,
2583 text: Option<&str>,
2584 alert: bool,
2585 ) -> Result<bool, InvocationError> {
2586 let req = tl::functions::messages::SetBotCallbackAnswer {
2587 alert,
2588 query_id,
2589 message: text.map(|s| s.to_string()),
2590 url: None,
2591 cache_time: 0,
2592 };
2593 let body = self.rpc_call_raw(&req).await?;
2594 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
2595 }
2596
2597 pub async fn answer_inline_query(
2598 &self,
2599 query_id: i64,
2600 results: Vec<tl::enums::InputBotInlineResult>,
2601 cache_time: i32,
2602 is_personal: bool,
2603 next_offset: Option<String>,
2604 ) -> Result<bool, InvocationError> {
2605 let req = tl::functions::messages::SetInlineBotResults {
2606 gallery: false,
2607 private: is_personal,
2608 query_id,
2609 results,
2610 cache_time,
2611 next_offset,
2612 switch_pm: None,
2613 switch_webview: None,
2614 };
2615 let body = self.rpc_call_raw(&req).await?;
2616 Ok(body.len() >= 4 && u32::from_le_bytes(body[..4].try_into().unwrap()) == 0x997275b5)
2617 }
2618
2619 pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
2623 let req = tl::functions::messages::GetDialogs {
2624 exclude_pinned: false,
2625 folder_id: None,
2626 offset_date: 0,
2627 offset_id: 0,
2628 offset_peer: tl::enums::InputPeer::Empty,
2629 limit,
2630 hash: 0,
2631 };
2632
2633 let body = self.rpc_call_raw(&req).await?;
2634 let mut cur = Cursor::from_slice(&body);
2635 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2636 tl::enums::messages::Dialogs::Dialogs(d) => d,
2637 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
2638 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2639 },
2640 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
2641 };
2642
2643 let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2645 .map(|m| {
2646 let id = match &m {
2647 tl::enums::Message::Message(x) => x.id,
2648 tl::enums::Message::Service(x) => x.id,
2649 tl::enums::Message::Empty(x) => x.id,
2650 };
2651 (id, m)
2652 })
2653 .collect();
2654
2655 let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2657 .filter_map(|u| {
2658 if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2659 })
2660 .collect();
2661
2662 let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2664 .filter_map(|c| {
2665 let id = match &c {
2666 tl::enums::Chat::Chat(x) => x.id,
2667 tl::enums::Chat::Forbidden(x) => x.id,
2668 tl::enums::Chat::Channel(x) => x.id,
2669 tl::enums::Chat::ChannelForbidden(x) => x.id,
2670 tl::enums::Chat::Empty(x) => x.id,
2671 };
2672 Some((id, c))
2673 })
2674 .collect();
2675
2676 {
2678 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2679 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2680 self.cache_users_slice(&u_list).await;
2681 self.cache_chats_slice(&c_list).await;
2682 }
2683
2684 let result = raw.dialogs.into_iter().map(|d| {
2685 let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2686 let peer = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2687
2688 let message = msg_map.get(&top_id).cloned();
2689 let entity = peer.and_then(|p| match p {
2690 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2691 _ => None,
2692 });
2693 let chat = peer.and_then(|p| match p {
2694 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
2695 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2696 _ => None,
2697 });
2698
2699 Dialog { raw: d, message, entity, chat }
2700 }).collect();
2701
2702 Ok(result)
2703 }
2704
2705 #[allow(dead_code)]
2707 async fn get_dialogs_raw(
2708 &self,
2709 req: tl::functions::messages::GetDialogs,
2710 ) -> Result<Vec<Dialog>, InvocationError> {
2711 let body = self.rpc_call_raw(&req).await?;
2712 let mut cur = Cursor::from_slice(&body);
2713 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2714 tl::enums::messages::Dialogs::Dialogs(d) => d,
2715 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
2716 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2717 },
2718 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
2719 };
2720
2721 let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2722 .map(|m| {
2723 let id = match &m {
2724 tl::enums::Message::Message(x) => x.id,
2725 tl::enums::Message::Service(x) => x.id,
2726 tl::enums::Message::Empty(x) => x.id,
2727 };
2728 (id, m)
2729 })
2730 .collect();
2731
2732 let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2733 .filter_map(|u| {
2734 if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2735 })
2736 .collect();
2737
2738 let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2739 .filter_map(|c| {
2740 let id = match &c {
2741 tl::enums::Chat::Chat(x) => x.id,
2742 tl::enums::Chat::Forbidden(x) => x.id,
2743 tl::enums::Chat::Channel(x) => x.id,
2744 tl::enums::Chat::ChannelForbidden(x) => x.id,
2745 tl::enums::Chat::Empty(x) => x.id,
2746 };
2747 Some((id, c))
2748 })
2749 .collect();
2750
2751 {
2752 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2753 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2754 self.cache_users_slice(&u_list).await;
2755 self.cache_chats_slice(&c_list).await;
2756 }
2757
2758 let result = raw.dialogs.into_iter().map(|d| {
2759 let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2760 let peer = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2761
2762 let message = msg_map.get(&top_id).cloned();
2763 let entity = peer.and_then(|p| match p {
2764 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2765 _ => None,
2766 });
2767 let chat = peer.and_then(|p| match p {
2768 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
2769 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2770 _ => None,
2771 });
2772
2773 Dialog { raw: d, message, entity, chat }
2774 }).collect();
2775
2776 Ok(result)
2777 }
2778
2779 async fn get_dialogs_raw_with_count(
2781 &self,
2782 req: tl::functions::messages::GetDialogs,
2783 ) -> Result<(Vec<Dialog>, Option<i32>), InvocationError> {
2784 let body = self.rpc_call_raw(&req).await?;
2785 let mut cur = Cursor::from_slice(&body);
2786 let (raw, count) = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2787 tl::enums::messages::Dialogs::Dialogs(d) => (d, None),
2788 tl::enums::messages::Dialogs::Slice(d) => {
2789 let cnt = Some(d.count);
2790 (tl::types::messages::Dialogs {
2791 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2792 }, cnt)
2793 }
2794 tl::enums::messages::Dialogs::NotModified(_) => return Ok((vec![], None)),
2795 };
2796
2797 let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2798 .filter_map(|m| {
2799 let id = match &m {
2800 tl::enums::Message::Message(x) => x.id,
2801 tl::enums::Message::Service(x) => x.id,
2802 tl::enums::Message::Empty(x) => x.id,
2803 };
2804 Some((id, m))
2805 }).collect();
2806
2807 let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2808 .filter_map(|u| {
2809 if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2810 }).collect();
2811
2812 let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2813 .filter_map(|c| {
2814 let id = match &c {
2815 tl::enums::Chat::Chat(x) => x.id,
2816 tl::enums::Chat::Forbidden(x) => x.id,
2817 tl::enums::Chat::Channel(x) => x.id,
2818 tl::enums::Chat::ChannelForbidden(x) => x.id,
2819 tl::enums::Chat::Empty(x) => x.id,
2820 };
2821 Some((id, c))
2822 }).collect();
2823
2824 {
2825 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2826 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2827 self.cache_users_slice(&u_list).await;
2828 self.cache_chats_slice(&c_list).await;
2829 }
2830
2831 let result = raw.dialogs.into_iter().map(|d| {
2832 let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2833 let peer = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2834 let message = msg_map.get(&top_id).cloned();
2835 let entity = peer.and_then(|p| match p {
2836 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2837 _ => None,
2838 });
2839 let chat = peer.and_then(|p| match p {
2840 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
2841 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2842 _ => None,
2843 });
2844 Dialog { raw: d, message, entity, chat }
2845 }).collect();
2846
2847 Ok((result, count))
2848 }
2849
2850 async fn get_messages_with_count(
2852 &self,
2853 peer: tl::enums::InputPeer,
2854 limit: i32,
2855 offset_id: i32,
2856 ) -> Result<(Vec<update::IncomingMessage>, Option<i32>), InvocationError> {
2857 let req = tl::functions::messages::GetHistory {
2858 peer, offset_id, offset_date: 0, add_offset: 0,
2859 limit, max_id: 0, min_id: 0, hash: 0,
2860 };
2861 let body = self.rpc_call_raw(&req).await?;
2862 let mut cur = Cursor::from_slice(&body);
2863 let (msgs, count) = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2864 tl::enums::messages::Messages::Messages(m) => (m.messages, None),
2865 tl::enums::messages::Messages::Slice(m) => {
2866 let cnt = Some(m.count);
2867 (m.messages, cnt)
2868 }
2869 tl::enums::messages::Messages::ChannelMessages(m) => (m.messages, Some(m.count)),
2870 tl::enums::messages::Messages::NotModified(_) => (vec![], None),
2871 };
2872 Ok((msgs.into_iter().map(update::IncomingMessage::from_raw).collect(), count))
2873 }
2874
2875 pub async fn download_media_to_file(
2886 &self,
2887 location: tl::enums::InputFileLocation,
2888 path: impl AsRef<std::path::Path>,
2889 ) -> Result<(), InvocationError> {
2890 let bytes = self.download_media(location).await?;
2891 std::fs::write(path, &bytes).map_err(InvocationError::Io)?;
2892 Ok(())
2893 }
2894
2895 pub async fn delete_dialog(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2896 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2897 let req = tl::functions::messages::DeleteHistory {
2898 just_clear: false,
2899 revoke: false,
2900 peer: input_peer,
2901 max_id: 0,
2902 min_date: None,
2903 max_date: None,
2904 };
2905 self.rpc_write(&req).await
2906 }
2907
2908 pub async fn mark_as_read(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2910 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2911 match &input_peer {
2912 tl::enums::InputPeer::Channel(c) => {
2913 let req = tl::functions::channels::ReadHistory {
2914 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2915 channel_id: c.channel_id, access_hash: c.access_hash,
2916 }),
2917 max_id: 0,
2918 };
2919 self.rpc_call_raw(&req).await?;
2920 }
2921 _ => {
2922 let req = tl::functions::messages::ReadHistory { peer: input_peer, max_id: 0 };
2923 self.rpc_call_raw(&req).await?;
2924 }
2925 }
2926 Ok(())
2927 }
2928
2929 pub async fn clear_mentions(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2931 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2932 let req = tl::functions::messages::ReadMentions { peer: input_peer, top_msg_id: None };
2933 self.rpc_write(&req).await
2934 }
2935
2936 pub async fn send_chat_action(
2944 &self,
2945 peer: tl::enums::Peer,
2946 action: tl::enums::SendMessageAction,
2947 ) -> Result<(), InvocationError> {
2948 self.send_chat_action_ex(peer, action, None).await
2949 }
2950
2951 pub async fn join_chat(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2955 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
2956 match input_peer {
2957 tl::enums::InputPeer::Channel(c) => {
2958 let req = tl::functions::channels::JoinChannel {
2959 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2960 channel_id: c.channel_id, access_hash: c.access_hash,
2961 }),
2962 };
2963 self.rpc_call_raw(&req).await?;
2964 }
2965 tl::enums::InputPeer::Chat(c) => {
2966 let req = tl::functions::messages::AddChatUser {
2967 chat_id: c.chat_id,
2968 user_id: tl::enums::InputUser::UserSelf,
2969 fwd_limit: 0,
2970 };
2971 self.rpc_call_raw(&req).await?;
2972 }
2973 _ => return Err(InvocationError::Deserialize("cannot join this peer type".into())),
2974 }
2975 Ok(())
2976 }
2977
2978 pub async fn accept_invite_link(&self, link: &str) -> Result<(), InvocationError> {
2980 let hash = Self::parse_invite_hash(link)
2981 .ok_or_else(|| InvocationError::Deserialize(format!("invalid invite link: {link}")))?;
2982 let req = tl::functions::messages::ImportChatInvite { hash: hash.to_string() };
2983 self.rpc_write(&req).await
2984 }
2985
2986 pub fn parse_invite_hash(link: &str) -> Option<&str> {
2988 if let Some(pos) = link.find("/+") {
2989 return Some(&link[pos + 2..]);
2990 }
2991 if let Some(pos) = link.find("/joinchat/") {
2992 return Some(&link[pos + 10..]);
2993 }
2994 None
2995 }
2996
2997 pub async fn get_messages(
3001 &self,
3002 peer: tl::enums::InputPeer,
3003 limit: i32,
3004 offset_id: i32,
3005 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
3006 let req = tl::functions::messages::GetHistory {
3007 peer, offset_id, offset_date: 0, add_offset: 0,
3008 limit, max_id: 0, min_id: 0, hash: 0,
3009 };
3010 let body = self.rpc_call_raw(&req).await?;
3011 let mut cur = Cursor::from_slice(&body);
3012 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
3013 tl::enums::messages::Messages::Messages(m) => m.messages,
3014 tl::enums::messages::Messages::Slice(m) => m.messages,
3015 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
3016 tl::enums::messages::Messages::NotModified(_) => vec![],
3017 };
3018 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
3019 }
3020
3021 pub async fn resolve_peer(
3025 &self,
3026 peer: &str,
3027 ) -> Result<tl::enums::Peer, InvocationError> {
3028 match peer.trim() {
3029 "me" | "self" => Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 })),
3030 username if username.starts_with('@') => {
3031 self.resolve_username(&username[1..]).await
3032 }
3033 id_str => {
3034 if let Ok(id) = id_str.parse::<i64>() {
3035 Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: id }))
3036 } else {
3037 Err(InvocationError::Deserialize(format!("cannot resolve peer: {peer}")))
3038 }
3039 }
3040 }
3041 }
3042
3043 pub async fn resolve_username(&self, username: &str) -> Result<tl::enums::Peer, InvocationError> {
3047 let req = tl::functions::contacts::ResolveUsername {
3048 username: username.to_string(), referer: None,
3049 };
3050 let body = self.rpc_call_raw(&req).await?;
3051 let mut cur = Cursor::from_slice(&body);
3052 let resolved = match tl::enums::contacts::ResolvedPeer::deserialize(&mut cur)? {
3053 tl::enums::contacts::ResolvedPeer::ResolvedPeer(r) => r,
3054 };
3055 self.cache_users_slice(&resolved.users).await;
3057 self.cache_chats_slice(&resolved.chats).await;
3058 Ok(resolved.peer)
3059 }
3060
3061 pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
3065 let body = self.rpc_call_raw(req).await?;
3066 let mut cur = Cursor::from_slice(&body);
3067 R::Return::deserialize(&mut cur).map_err(Into::into)
3068 }
3069
3070 async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
3071 let mut fail_count = NonZeroU32::new(1).unwrap();
3072 let mut slept_so_far = Duration::default();
3073 loop {
3074 match self.do_rpc_call(req).await {
3075 Ok(body) => return Ok(body),
3076 Err(e) => {
3077 let ctx = RetryContext { fail_count, slept_so_far, error: e };
3078 match self.inner.retry_policy.should_retry(&ctx) {
3079 ControlFlow::Continue(delay) => {
3080 sleep(delay).await;
3081 slept_so_far += delay;
3082 fail_count = fail_count.saturating_add(1);
3083 }
3084 ControlFlow::Break(()) => return Err(ctx.error),
3085 }
3086 }
3087 }
3088 }
3089 }
3090
3091 async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
3101 let (tx, rx) = oneshot::channel();
3102 {
3103 let raw_body = req.to_bytes();
3104 let body = maybe_gz_pack(&raw_body);
3106
3107 let mut w = self.inner.writer.lock().await;
3108 let fk = w.frame_kind.clone();
3109
3110 let acks: Vec<i64> = w.pending_ack.drain(..).collect();
3113
3114 if acks.is_empty() {
3115 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
3117 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
3119 send_frame_write(&mut w.write_half, &wire, &fk).await?;
3120 } else {
3121 let ack_body = build_msgs_ack_body(&acks);
3124 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false); let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true); let container_payload = build_container_body(&[
3130 (ack_msg_id, ack_seqno, ack_body.as_slice()),
3131 (req_msg_id, req_seqno, body.as_slice()),
3132 ]);
3133
3134 let wire = w.enc.pack_container(&container_payload);
3136
3137 w.sent_bodies.insert(req_msg_id, body); self.inner.pending.lock().await.insert(req_msg_id, tx);
3139 send_frame_write(&mut w.write_half, &wire, &fk).await?;
3140 log::trace!("[layer] G-07 container: bundled {} acks + request", acks.len());
3141 }
3142 }
3143 match tokio::time::timeout(Duration::from_secs(30), rx).await {
3144 Ok(Ok(result)) => result,
3145 Ok(Err(_)) => Err(InvocationError::Deserialize("RPC channel closed (reader died?)".into())),
3146 Err(_) => Err(InvocationError::Deserialize("RPC timed out after 30 s".into())),
3147 }
3148 }
3149
3150 async fn rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
3153 let mut fail_count = NonZeroU32::new(1).unwrap();
3154 let mut slept_so_far = Duration::default();
3155 loop {
3156 let result = self.do_rpc_write(req).await;
3157 match result {
3158 Ok(()) => return Ok(()),
3159 Err(e) => {
3160 let ctx = RetryContext { fail_count, slept_so_far, error: e };
3161 match self.inner.retry_policy.should_retry(&ctx) {
3162 ControlFlow::Continue(delay) => {
3163 sleep(delay).await;
3164 slept_so_far += delay;
3165 fail_count = fail_count.saturating_add(1);
3166 }
3167 ControlFlow::Break(()) => return Err(ctx.error),
3168 }
3169 }
3170 }
3171 }
3172 }
3173
3174 async fn do_rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
3175 let (tx, rx) = oneshot::channel();
3176 {
3177 let raw_body = req.to_bytes();
3178 let body = maybe_gz_pack(&raw_body);
3180
3181 let mut w = self.inner.writer.lock().await;
3182 let fk = w.frame_kind.clone();
3183
3184 let acks: Vec<i64> = w.pending_ack.drain(..).collect();
3186
3187 if acks.is_empty() {
3188 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
3189 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
3191 send_frame_write(&mut w.write_half, &wire, &fk).await?;
3192 } else {
3193 let ack_body = build_msgs_ack_body(&acks);
3194 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
3195 let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);
3196 let container_payload = build_container_body(&[
3197 (ack_msg_id, ack_seqno, ack_body.as_slice()),
3198 (req_msg_id, req_seqno, body.as_slice()),
3199 ]);
3200 let wire = w.enc.pack_container(&container_payload);
3201 w.sent_bodies.insert(req_msg_id, body); self.inner.pending.lock().await.insert(req_msg_id, tx);
3203 send_frame_write(&mut w.write_half, &wire, &fk).await?;
3204 log::trace!("[layer] G-07 write container: bundled {} acks + write", acks.len());
3205 }
3206 }
3207 match tokio::time::timeout(Duration::from_secs(30), rx).await {
3208 Ok(Ok(result)) => result.map(|_| ()),
3209 Ok(Err(_)) => Err(InvocationError::Deserialize("rpc_write channel closed".into())),
3210 Err(_) => Err(InvocationError::Deserialize("rpc_write timed out after 30 s".into())),
3211 }
3212 }
3213
3214 async fn init_connection(&self) -> Result<(), InvocationError> {
3217 use tl::functions::{InvokeWithLayer, InitConnection, help::GetConfig};
3218 let req = InvokeWithLayer {
3219 layer: tl::LAYER,
3220 query: InitConnection {
3221 api_id: self.inner.api_id,
3222 device_model: "Linux".to_string(),
3223 system_version: "1.0".to_string(),
3224 app_version: env!("CARGO_PKG_VERSION").to_string(),
3225 system_lang_code: "en".to_string(),
3226 lang_pack: "".to_string(),
3227 lang_code: "en".to_string(),
3228 proxy: None,
3229 params: None,
3230 query: GetConfig {},
3231 },
3232 };
3233
3234 let body = self.rpc_call_raw_serializable(&req).await?;
3236
3237 let mut cur = Cursor::from_slice(&body);
3238 if let Ok(tl::enums::Config::Config(cfg)) = tl::enums::Config::deserialize(&mut cur) {
3239 let allow_ipv6 = self.inner.allow_ipv6;
3240 let mut opts = self.inner.dc_options.lock().await;
3241 for opt in &cfg.dc_options {
3242 let tl::enums::DcOption::DcOption(o) = opt;
3243 if o.media_only || o.cdn || o.tcpo_only { continue; }
3244 if o.ipv6 && !allow_ipv6 { continue; }
3245 let addr = format!("{}:{}", o.ip_address, o.port);
3246 let entry = opts.entry(o.id).or_insert_with(|| DcEntry {
3247 dc_id: o.id, addr: addr.clone(),
3248 auth_key: None, first_salt: 0, time_offset: 0,
3249 });
3250 entry.addr = addr;
3251 }
3252 log::info!("[layer] initConnection ✓ ({} DCs, ipv6={})", cfg.dc_options.len(), allow_ipv6);
3253 }
3254 Ok(())
3255 }
3256
3257 async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
3260 let addr = {
3261 let opts = self.inner.dc_options.lock().await;
3262 opts.get(&new_dc_id).map(|e| e.addr.clone())
3263 .unwrap_or_else(|| "149.154.167.51:443".to_string())
3264 };
3265 log::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
3266
3267 let saved_key = {
3268 let opts = self.inner.dc_options.lock().await;
3269 opts.get(&new_dc_id).and_then(|e| e.auth_key)
3270 };
3271
3272 let socks5 = self.inner.socks5.clone();
3273 let transport = self.inner.transport.clone();
3274 let conn = if let Some(key) = saved_key {
3275 Connection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
3276 } else {
3277 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
3278 };
3279
3280 let new_key = conn.auth_key_bytes();
3281 {
3282 let mut opts = self.inner.dc_options.lock().await;
3283 let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
3284 dc_id: new_dc_id, addr: addr.clone(),
3285 auth_key: None, first_salt: 0, time_offset: 0,
3286 });
3287 entry.auth_key = Some(new_key);
3288 }
3289
3290 let (new_writer, new_read, new_fk) = conn.into_writer();
3292 let new_ak = new_writer.enc.auth_key_bytes();
3293 let new_sid = new_writer.enc.session_id();
3294 *self.inner.writer.lock().await = new_writer;
3295 *self.inner.home_dc_id.lock().await = new_dc_id;
3296
3297 let _ = self.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
3300
3301 loop {
3311 match self.init_connection().await {
3312 Ok(()) => break,
3313 Err(InvocationError::Rpc(ref r))
3314 if r.flood_wait_seconds().is_some() =>
3315 {
3316 let secs = r.flood_wait_seconds().unwrap();
3317 log::warn!(
3318 "[layer] migrate_to DC{new_dc_id}: init FLOOD_WAIT_{secs} — waiting"
3319 );
3320 sleep(Duration::from_secs(secs + 1)).await;
3321 }
3322 Err(e) => return Err(e),
3323 }
3324 }
3325
3326 self.save_session().await.ok();
3327 log::info!("[layer] Now on DC{new_dc_id} ✓");
3328 Ok(())
3329 }
3330
3331 pub fn disconnect(&self) {
3341 self.inner.shutdown_token.cancel();
3342 }
3343
3344 pub async fn sync_update_state(&self) {
3352 let _ = self.sync_pts_state().await;
3353 }
3354
3355 async fn cache_user(&self, user: &tl::enums::User) {
3358 self.inner.peer_cache.write().await.cache_user(user);
3359 }
3360
3361 async fn cache_users_slice(&self, users: &[tl::enums::User]) {
3362 let mut cache = self.inner.peer_cache.write().await;
3363 cache.cache_users(users);
3364 }
3365
3366 async fn cache_chats_slice(&self, chats: &[tl::enums::Chat]) {
3367 let mut cache = self.inner.peer_cache.write().await;
3368 cache.cache_chats(chats);
3369 }
3370
3371 #[doc(hidden)]
3373 pub async fn cache_users_slice_pub(&self, users: &[tl::enums::User]) {
3374 self.cache_users_slice(users).await;
3375 }
3376
3377 #[doc(hidden)]
3378 pub async fn cache_chats_slice_pub(&self, chats: &[tl::enums::Chat]) {
3379 self.cache_chats_slice(chats).await;
3380 }
3381
3382 #[doc(hidden)]
3384 pub async fn rpc_call_raw_pub<R: layer_tl_types::RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
3385 self.rpc_call_raw(req).await
3386 }
3387
3388 async fn rpc_call_raw_serializable<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
3390 let mut fail_count = NonZeroU32::new(1).unwrap();
3391 let mut slept_so_far = Duration::default();
3392 loop {
3393 match self.do_rpc_write_returning_body(req).await {
3394 Ok(body) => return Ok(body),
3395 Err(e) => {
3396 let ctx = RetryContext { fail_count, slept_so_far, error: e };
3397 match self.inner.retry_policy.should_retry(&ctx) {
3398 ControlFlow::Continue(delay) => {
3399 sleep(delay).await;
3400 slept_so_far += delay;
3401 fail_count = fail_count.saturating_add(1);
3402 }
3403 ControlFlow::Break(()) => return Err(ctx.error),
3404 }
3405 }
3406 }
3407 }
3408 }
3409
3410 async fn do_rpc_write_returning_body<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
3411 let (tx, rx) = oneshot::channel();
3412 {
3413 let raw_body = req.to_bytes();
3414 let body = maybe_gz_pack(&raw_body); let mut w = self.inner.writer.lock().await;
3416 let fk = w.frame_kind.clone();
3417 let acks: Vec<i64> = w.pending_ack.drain(..).collect(); if acks.is_empty() {
3419 let (wire, msg_id) = w.enc.pack_body_with_msg_id(&body, true);
3420 w.sent_bodies.insert(msg_id, body); self.inner.pending.lock().await.insert(msg_id, tx);
3422 send_frame_write(&mut w.write_half, &wire, &fk).await?;
3423 } else {
3424 let ack_body = build_msgs_ack_body(&acks);
3425 let (ack_msg_id, ack_seqno) = w.enc.alloc_msg_seqno(false);
3426 let (req_msg_id, req_seqno) = w.enc.alloc_msg_seqno(true);
3427 let container_payload = build_container_body(&[
3428 (ack_msg_id, ack_seqno, ack_body.as_slice()),
3429 (req_msg_id, req_seqno, body.as_slice()),
3430 ]);
3431 let wire = w.enc.pack_container(&container_payload);
3432 w.sent_bodies.insert(req_msg_id, body); self.inner.pending.lock().await.insert(req_msg_id, tx);
3434 send_frame_write(&mut w.write_half, &wire, &fk).await?;
3435 }
3436 }
3437 match tokio::time::timeout(Duration::from_secs(30), rx).await {
3438 Ok(Ok(result)) => result,
3439 Ok(Err(_)) => Err(InvocationError::Deserialize("rpc channel closed".into())),
3440 Err(_) => Err(InvocationError::Deserialize("rpc timed out after 30 s".into())),
3441 }
3442 }
3443
3444 pub fn iter_dialogs(&self) -> DialogIter {
3461 DialogIter {
3462 offset_date: 0,
3463 offset_id: 0,
3464 offset_peer: tl::enums::InputPeer::Empty,
3465 done: false,
3466 buffer: VecDeque::new(),
3467 total: None,
3468 }
3469 }
3470
3471 pub fn iter_messages(&self, peer: tl::enums::Peer) -> MessageIter {
3485 MessageIter {
3486 peer,
3487 offset_id: 0,
3488 done: false,
3489 buffer: VecDeque::new(),
3490 total: None,
3491 }
3492 }
3493
3494 pub async fn resolve_to_input_peer(
3499 &self,
3500 peer: &tl::enums::Peer,
3501 ) -> Result<tl::enums::InputPeer, InvocationError> {
3502 let cache = self.inner.peer_cache.read().await;
3503 match peer {
3504 tl::enums::Peer::User(u) => {
3505 if u.user_id == 0 {
3506 return Ok(tl::enums::InputPeer::PeerSelf);
3507 }
3508 match cache.users.get(&u.user_id) {
3509 Some(&hash) => Ok(tl::enums::InputPeer::User(tl::types::InputPeerUser {
3510 user_id: u.user_id, access_hash: hash,
3511 })),
3512 None => Err(InvocationError::Deserialize(format!(
3513 "access_hash unknown for user {}; resolve via username first", u.user_id
3514 ))),
3515 }
3516 }
3517 tl::enums::Peer::Chat(c) => {
3518 Ok(tl::enums::InputPeer::Chat(tl::types::InputPeerChat { chat_id: c.chat_id }))
3519 }
3520 tl::enums::Peer::Channel(c) => {
3521 match cache.channels.get(&c.channel_id) {
3522 Some(&hash) => Ok(tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
3523 channel_id: c.channel_id, access_hash: hash,
3524 })),
3525 None => Err(InvocationError::Deserialize(format!(
3526 "access_hash unknown for channel {}; resolve via username first", c.channel_id
3527 ))),
3528 }
3529 }
3530 }
3531 }
3532
3533 pub async fn invoke_on_dc<R: RemoteCall>(
3541 &self,
3542 dc_id: i32,
3543 req: &R,
3544 ) -> Result<R::Return, InvocationError> {
3545 let body = self.rpc_on_dc_raw(dc_id, req).await?;
3546 let mut cur = Cursor::from_slice(&body);
3547 R::Return::deserialize(&mut cur).map_err(Into::into)
3548 }
3549
3550 async fn rpc_on_dc_raw<R: RemoteCall>(
3552 &self,
3553 dc_id: i32,
3554 req: &R,
3555 ) -> Result<Vec<u8>, InvocationError> {
3556 let needs_new = {
3558 let pool = self.inner.dc_pool.lock().await;
3559 !pool.has_connection(dc_id)
3560 };
3561
3562 if needs_new {
3563 let addr = {
3564 let opts = self.inner.dc_options.lock().await;
3565 opts.get(&dc_id).map(|e| e.addr.clone())
3566 .ok_or_else(|| InvocationError::Deserialize(format!("unknown DC{dc_id}")))?
3567 };
3568
3569 let socks5 = self.inner.socks5.clone();
3570 let transport = self.inner.transport.clone();
3571 let saved_key = {
3572 let opts = self.inner.dc_options.lock().await;
3573 opts.get(&dc_id).and_then(|e| e.auth_key)
3574 };
3575
3576 let dc_conn = if let Some(key) = saved_key {
3577 dc_pool::DcConnection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
3578 } else {
3579 let conn = dc_pool::DcConnection::connect_raw(&addr, socks5.as_ref(), &transport).await?;
3580 let home_dc_id = *self.inner.home_dc_id.lock().await;
3582 if dc_id != home_dc_id {
3583 if let Err(e) = self.export_import_auth(dc_id, &conn).await {
3584 log::warn!("[layer] Auth export/import for DC{dc_id} failed: {e}");
3585 }
3586 }
3587 conn
3588 };
3589
3590 let key = dc_conn.auth_key_bytes();
3591 {
3592 let mut opts = self.inner.dc_options.lock().await;
3593 if let Some(e) = opts.get_mut(&dc_id) {
3594 e.auth_key = Some(key);
3595 }
3596 }
3597 self.inner.dc_pool.lock().await.insert(dc_id, dc_conn);
3598 }
3599
3600 let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
3601 self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, req).await
3602 }
3603
3604 async fn export_import_auth(
3606 &self,
3607 dc_id: i32,
3608 _dc_conn: &dc_pool::DcConnection, ) -> Result<(), InvocationError> {
3610 let export_req = tl::functions::auth::ExportAuthorization { dc_id };
3612 let body = self.rpc_call_raw(&export_req).await?;
3613 let mut cur = Cursor::from_slice(&body);
3614 let exported = match tl::enums::auth::ExportedAuthorization::deserialize(&mut cur)? {
3615 tl::enums::auth::ExportedAuthorization::ExportedAuthorization(e) => e,
3616 };
3617
3618 let import_req = tl::functions::auth::ImportAuthorization {
3620 id: exported.id,
3621 bytes: exported.bytes,
3622 };
3623 let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
3624 self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, &import_req).await?;
3625 log::info!("[layer] Auth exported+imported to DC{dc_id} ✓");
3626 Ok(())
3627 }
3628
3629 async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
3632 let body = self.rpc_call_raw(&tl::functions::account::GetPassword {}).await?;
3633 let mut cur = Cursor::from_slice(&body);
3634 let pw = match tl::enums::account::Password::deserialize(&mut cur)? {
3635 tl::enums::account::Password::Password(p) => p,
3636 };
3637 Ok(PasswordToken { password: pw })
3638 }
3639
3640 fn make_send_code_req(&self, phone: &str) -> tl::functions::auth::SendCode {
3641 tl::functions::auth::SendCode {
3642 phone_number: phone.to_string(),
3643 api_id: self.inner.api_id,
3644 api_hash: self.inner.api_hash.clone(),
3645 settings: tl::enums::CodeSettings::CodeSettings(
3646 tl::types::CodeSettings {
3647 allow_flashcall: false, current_number: false, allow_app_hash: false,
3648 allow_missed_call: false, allow_firebase: false, unknown_number: false,
3649 logout_tokens: None, token: None, app_sandbox: None,
3650 },
3651 ),
3652 }
3653 }
3654
3655 fn extract_user_name(user: &tl::enums::User) -> String {
3656 match user {
3657 tl::enums::User::User(u) => {
3658 format!("{} {}",
3659 u.first_name.as_deref().unwrap_or(""),
3660 u.last_name.as_deref().unwrap_or(""))
3661 .trim().to_string()
3662 }
3663 tl::enums::User::Empty(_) => "(unknown)".into(),
3664 }
3665 }
3666
3667 fn extract_password_params(
3668 algo: &tl::enums::PasswordKdfAlgo,
3669 ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
3670 match algo {
3671 tl::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
3672 Ok((&a.salt1, &a.salt2, &a.p, a.g))
3673 }
3674 _ => Err(InvocationError::Deserialize("unsupported password KDF algo".into())),
3675 }
3676 }
3677}
3678
3679pub struct DialogIter {
3683 offset_date: i32,
3684 offset_id: i32,
3685 offset_peer: tl::enums::InputPeer,
3686 done: bool,
3687 buffer: VecDeque<Dialog>,
3688 pub total: Option<i32>,
3691}
3692
3693impl DialogIter {
3694 const PAGE_SIZE: i32 = 100;
3695
3696 pub fn total(&self) -> Option<i32> { self.total }
3702
3703 pub async fn next(&mut self, client: &Client) -> Result<Option<Dialog>, InvocationError> {
3705 if let Some(d) = self.buffer.pop_front() { return Ok(Some(d)); }
3706 if self.done { return Ok(None); }
3707
3708 let req = tl::functions::messages::GetDialogs {
3709 exclude_pinned: false,
3710 folder_id: None,
3711 offset_date: self.offset_date,
3712 offset_id: self.offset_id,
3713 offset_peer: self.offset_peer.clone(),
3714 limit: Self::PAGE_SIZE,
3715 hash: 0,
3716 };
3717
3718 let (dialogs, count) = client.get_dialogs_raw_with_count(req).await?;
3719 if self.total.is_none() {
3721 self.total = count;
3722 }
3723 if dialogs.is_empty() || dialogs.len() < Self::PAGE_SIZE as usize {
3724 self.done = true;
3725 }
3726
3727 if let Some(last) = dialogs.last() {
3729 self.offset_date = last.message.as_ref().map(|m| match m {
3730 tl::enums::Message::Message(x) => x.date,
3731 tl::enums::Message::Service(x) => x.date,
3732 _ => 0,
3733 }).unwrap_or(0);
3734 self.offset_id = last.top_message();
3735 if let Some(peer) = last.peer() {
3736 self.offset_peer = client.inner.peer_cache.read().await.peer_to_input(peer);
3737 }
3738 }
3739
3740 self.buffer.extend(dialogs);
3741 Ok(self.buffer.pop_front())
3742 }
3743}
3744
3745pub struct MessageIter {
3747 peer: tl::enums::Peer,
3748 offset_id: i32,
3749 done: bool,
3750 buffer: VecDeque<update::IncomingMessage>,
3751 pub total: Option<i32>,
3755}
3756
3757impl MessageIter {
3758 const PAGE_SIZE: i32 = 100;
3759
3760 pub fn total(&self) -> Option<i32> { self.total }
3765
3766 pub async fn next(&mut self, client: &Client) -> Result<Option<update::IncomingMessage>, InvocationError> {
3768 if let Some(m) = self.buffer.pop_front() { return Ok(Some(m)); }
3769 if self.done { return Ok(None); }
3770
3771 let input_peer = client.inner.peer_cache.read().await.peer_to_input(&self.peer);
3772 let (page, count) = client.get_messages_with_count(input_peer, Self::PAGE_SIZE, self.offset_id).await?;
3773
3774 if self.total.is_none() { self.total = count; }
3775
3776 if page.is_empty() || page.len() < Self::PAGE_SIZE as usize {
3777 self.done = true;
3778 }
3779 if let Some(last) = page.last() {
3780 self.offset_id = last.id();
3781 }
3782
3783 self.buffer.extend(page);
3784 Ok(self.buffer.pop_front())
3785 }
3786}
3787
3788#[doc(hidden)]
3792pub fn random_i64_pub() -> i64 { random_i64() }
3793
3794#[derive(Clone)]
3798enum FrameKind {
3799 Abridged,
3800 Intermediate,
3801 #[allow(dead_code)]
3802 Full { send_seqno: u32, recv_seqno: u32 },
3803}
3804
3805
3806struct ConnectionWriter {
3811 write_half: OwnedWriteHalf,
3812 enc: EncryptedSession,
3813 frame_kind: FrameKind,
3814 pending_ack: Vec<i64>,
3818 sent_bodies: std::collections::HashMap<i64, Vec<u8>>,
3822}
3823
3824impl ConnectionWriter {
3825 fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
3826 fn first_salt(&self) -> i64 { self.enc.salt }
3827 fn time_offset(&self) -> i32 { self.enc.time_offset }
3828}
3829
3830struct Connection {
3831 stream: TcpStream,
3832 enc: EncryptedSession,
3833 frame_kind: FrameKind,
3834}
3835
3836impl Connection {
3837 async fn open_stream(
3839 addr: &str,
3840 socks5: Option<&crate::socks5::Socks5Config>,
3841 transport: &TransportKind,
3842 ) -> Result<(TcpStream, FrameKind), InvocationError> {
3843 let stream = match socks5 {
3844 Some(proxy) => proxy.connect(addr).await?,
3845 None => {
3846 let stream = TcpStream::connect(addr).await
3849 .map_err(InvocationError::Io)?;
3850
3851 stream.set_nodelay(true).ok();
3857
3858 {
3864 let sock = socket2::SockRef::from(&stream);
3865 let keepalive = TcpKeepalive::new()
3866 .with_time(Duration::from_secs(TCP_KEEPALIVE_IDLE_SECS))
3867 .with_interval(Duration::from_secs(TCP_KEEPALIVE_INTERVAL_SECS));
3868 #[cfg(not(target_os = "windows"))]
3869 let keepalive = keepalive.with_retries(TCP_KEEPALIVE_PROBES);
3870 sock.set_tcp_keepalive(&keepalive).ok();
3871 }
3872 stream
3873 }
3874 };
3875 Self::apply_transport_init(stream, transport).await
3876 }
3877
3878 async fn apply_transport_init(
3880 mut stream: TcpStream,
3881 transport: &TransportKind,
3882 ) -> Result<(TcpStream, FrameKind), InvocationError> {
3883 match transport {
3884 TransportKind::Abridged => {
3885 stream.write_all(&[0xef]).await?;
3886 Ok((stream, FrameKind::Abridged))
3887 }
3888 TransportKind::Intermediate => {
3889 stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
3890 Ok((stream, FrameKind::Intermediate))
3891 }
3892 TransportKind::Full => {
3893 Ok((stream, FrameKind::Full { send_seqno: 0, recv_seqno: 0 }))
3895 }
3896 TransportKind::Obfuscated { secret } => {
3897 let mut nonce = [0u8; 64];
3906 getrandom::getrandom(&mut nonce).map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
3907 let (enc_key, enc_iv, _dec_key, _dec_iv) = crate::transport_obfuscated::derive_keys(&nonce, secret.as_ref());
3909 let mut enc_cipher = crate::transport_obfuscated::ObfCipher::new(enc_key, enc_iv);
3910 let mut handshake = nonce;
3912 handshake[56] = 0xef; handshake[57] = 0xef;
3913 handshake[58] = 0xef; handshake[59] = 0xef;
3914 enc_cipher.apply(&mut handshake[56..]);
3915 stream.write_all(&handshake).await?;
3916 Ok((stream, FrameKind::Abridged))
3917 }
3918 }
3919 }
3920
3921 async fn connect_raw(
3922 addr: &str,
3923 socks5: Option<&crate::socks5::Socks5Config>,
3924 transport: &TransportKind,
3925 ) -> Result<Self, InvocationError> {
3926 log::info!("[layer] Connecting to {addr} (DH) …");
3927
3928 let addr2 = addr.to_string();
3932 let socks5_c = socks5.cloned();
3933 let transport_c = transport.clone();
3934
3935 let fut = async move {
3936 let (mut stream, frame_kind) =
3937 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
3938
3939 let mut plain = Session::new();
3940
3941 let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3942 send_frame(&mut stream, &plain.pack(&req1).to_plaintext_bytes(), &frame_kind).await?;
3943 let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
3944
3945 let (req2, s2) = auth::step2(s1, res_pq).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3946 send_frame(&mut stream, &plain.pack(&req2).to_plaintext_bytes(), &frame_kind).await?;
3947 let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
3948
3949 let (req3, s3) = auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3950 send_frame(&mut stream, &plain.pack(&req3).to_plaintext_bytes(), &frame_kind).await?;
3951 let ans: tl::enums::SetClientDhParamsAnswer = recv_frame_plain(&mut stream, &frame_kind).await?;
3952
3953 let done = auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3954 log::info!("[layer] DH complete ✓");
3955
3956 Ok::<Self, InvocationError>(Self {
3957 stream,
3958 enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
3959 frame_kind,
3960 })
3961 };
3962
3963 tokio::time::timeout(Duration::from_secs(15), fut)
3964 .await
3965 .map_err(|_| InvocationError::Deserialize(
3966 format!("DH handshake with {addr} timed out after 15 s")
3967 ))?
3968 }
3969
3970 async fn connect_with_key(
3971 addr: &str,
3972 auth_key: [u8; 256],
3973 first_salt: i64,
3974 time_offset: i32,
3975 socks5: Option<&crate::socks5::Socks5Config>,
3976 transport: &TransportKind,
3977 ) -> Result<Self, InvocationError> {
3978 let addr2 = addr.to_string();
3979 let socks5_c = socks5.cloned();
3980 let transport_c = transport.clone();
3981
3982 let fut = async move {
3983 let (stream, frame_kind) =
3984 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
3985 Ok::<Self, InvocationError>(Self {
3986 stream,
3987 enc: EncryptedSession::new(auth_key, first_salt, time_offset),
3988 frame_kind,
3989 })
3990 };
3991
3992 tokio::time::timeout(Duration::from_secs(15), fut)
3993 .await
3994 .map_err(|_| InvocationError::Deserialize(
3995 format!("connect_with_key to {addr} timed out after 15 s")
3996 ))?
3997 }
3998
3999 fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
4000
4001 fn into_writer(self) -> (ConnectionWriter, OwnedReadHalf, FrameKind) {
4003 let (read_half, write_half) = self.stream.into_split();
4004 let writer = ConnectionWriter {
4005 write_half,
4006 enc: self.enc,
4007 frame_kind: self.frame_kind.clone(),
4008 pending_ack: Vec::new(),
4009 sent_bodies: std::collections::HashMap::new(),
4010 };
4011 (writer, read_half, self.frame_kind)
4012 }
4013}
4014
4015async fn send_frame(
4019 stream: &mut TcpStream,
4020 data: &[u8],
4021 kind: &FrameKind,
4022) -> Result<(), InvocationError> {
4023 match kind {
4024 FrameKind::Abridged => send_abridged(stream, data).await,
4025 FrameKind::Intermediate | FrameKind::Full { .. } => {
4026 let mut frame = Vec::with_capacity(4 + data.len());
4028 frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
4029 frame.extend_from_slice(data);
4030 stream.write_all(&frame).await?;
4031 Ok(())
4032 }
4033 }
4034}
4035
4036enum FrameOutcome {
4040 Frame(Vec<u8>),
4041 Error(InvocationError),
4042 Keepalive, }
4044
4045async fn recv_frame_with_keepalive(
4052 rh: &mut OwnedReadHalf,
4053 fk: &FrameKind,
4054 client: &Client,
4055 _ak: &[u8; 256],
4056) -> FrameOutcome {
4057 match tokio::time::timeout(Duration::from_secs(PING_DELAY_SECS), recv_frame_read(rh, fk)).await {
4058 Ok(Ok(raw)) => FrameOutcome::Frame(raw),
4059 Ok(Err(e)) => FrameOutcome::Error(e),
4060 Err(_) => {
4061 let ping_req = tl::functions::PingDelayDisconnect {
4065 ping_id: random_i64(),
4066 disconnect_delay: NO_PING_DISCONNECT,
4067 };
4068 let mut w = client.inner.writer.lock().await;
4069 let wire = w.enc.pack(&ping_req);
4070 let fk = w.frame_kind.clone();
4071 match send_frame_write(&mut w.write_half, &wire, &fk).await {
4072 Ok(()) => FrameOutcome::Keepalive,
4073 Err(e) => FrameOutcome::Error(e),
4077 }
4078 }
4079 }
4080}
4081
4082async fn send_frame_write(
4089 stream: &mut OwnedWriteHalf,
4090 data: &[u8],
4091 kind: &FrameKind,
4092) -> Result<(), InvocationError> {
4093 match kind {
4094 FrameKind::Abridged => {
4095 let words = data.len() / 4;
4096 let mut frame = if words < 0x7f {
4098 let mut v = Vec::with_capacity(1 + data.len());
4099 v.push(words as u8);
4100 v
4101 } else {
4102 let mut v = Vec::with_capacity(4 + data.len());
4103 v.extend_from_slice(&[
4104 0x7f,
4105 (words & 0xff) as u8,
4106 ((words >> 8) & 0xff) as u8,
4107 ((words >> 16) & 0xff) as u8,
4108 ]);
4109 v
4110 };
4111 frame.extend_from_slice(data);
4112 stream.write_all(&frame).await?;
4113 Ok(())
4114 }
4115 FrameKind::Intermediate | FrameKind::Full { .. } => {
4116 let mut frame = Vec::with_capacity(4 + data.len());
4117 frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
4118 frame.extend_from_slice(data);
4119 stream.write_all(&frame).await?;
4120 Ok(())
4121 }
4122 }
4123}
4124
4125async fn recv_frame_read(
4127 stream: &mut OwnedReadHalf,
4128 kind: &FrameKind,
4129) -> Result<Vec<u8>, InvocationError> {
4130 match kind {
4131 FrameKind::Abridged => {
4132 let mut h = [0u8; 1];
4133 stream.read_exact(&mut h).await?;
4134 let words = if h[0] < 0x7f {
4135 h[0] as usize
4136 } else {
4137 let mut b = [0u8; 3];
4138 stream.read_exact(&mut b).await?;
4139 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
4140 };
4141 let len = words * 4;
4142 let mut buf = vec![0u8; len];
4143 stream.read_exact(&mut buf).await?;
4144 Ok(buf)
4145 }
4146 FrameKind::Intermediate | FrameKind::Full { .. } => {
4147 let mut len_buf = [0u8; 4];
4148 stream.read_exact(&mut len_buf).await?;
4149 let len = u32::from_le_bytes(len_buf) as usize;
4150 let mut buf = vec![0u8; len];
4151 stream.read_exact(&mut buf).await?;
4152 Ok(buf)
4153 }
4154 }
4155}
4156
4157
4158async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
4160 let words = data.len() / 4;
4161 let mut frame = if words < 0x7f {
4163 let mut v = Vec::with_capacity(1 + data.len());
4164 v.push(words as u8);
4165 v
4166 } else {
4167 let mut v = Vec::with_capacity(4 + data.len());
4168 v.extend_from_slice(&[
4169 0x7f,
4170 (words & 0xff) as u8,
4171 ((words >> 8) & 0xff) as u8,
4172 ((words >> 16) & 0xff) as u8,
4173 ]);
4174 v
4175 };
4176 frame.extend_from_slice(data);
4177 stream.write_all(&frame).await?;
4178 Ok(())
4179}
4180
4181async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
4182 let mut h = [0u8; 1];
4183 stream.read_exact(&mut h).await?;
4184 let words = if h[0] < 0x7f {
4185 h[0] as usize
4186 } else {
4187 let mut b = [0u8; 3];
4188 stream.read_exact(&mut b).await?;
4189 let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
4190 if w == 1 {
4192 let mut code_buf = [0u8; 4];
4193 stream.read_exact(&mut code_buf).await?;
4194 let code = i32::from_le_bytes(code_buf);
4195 return Err(InvocationError::Rpc(RpcError::from_telegram(code, "transport error")));
4196 }
4197 w
4198 };
4199 if words == 0 || words > 0x8000 {
4202 return Err(InvocationError::Deserialize(
4203 format!("abridged: implausible word count {words} (possible transport error or framing mismatch)")
4204 ));
4205 }
4206 let mut buf = vec![0u8; words * 4];
4207 stream.read_exact(&mut buf).await?;
4208 Ok(buf)
4209}
4210
4211async fn recv_frame_plain<T: Deserializable>(
4213 stream: &mut TcpStream,
4214 _kind: &FrameKind,
4215) -> Result<T, InvocationError> {
4216 let raw = recv_abridged(stream).await?; if raw.len() < 20 {
4218 return Err(InvocationError::Deserialize("plaintext frame too short".into()));
4219 }
4220 if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
4221 return Err(InvocationError::Deserialize("expected auth_key_id=0 in plaintext".into()));
4222 }
4223 let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
4224 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
4225 T::deserialize(&mut cur).map_err(Into::into)
4226}
4227
4228enum EnvelopeResult {
4231 Payload(Vec<u8>),
4232 Updates(Vec<update::Update>),
4233 None,
4234}
4235
4236fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
4237 if body.len() < 4 {
4238 return Err(InvocationError::Deserialize("body < 4 bytes".into()));
4239 }
4240 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
4241
4242 match cid {
4243 ID_RPC_RESULT => {
4244 if body.len() < 12 {
4245 return Err(InvocationError::Deserialize("rpc_result too short".into()));
4246 }
4247 unwrap_envelope(body[12..].to_vec())
4248 }
4249 ID_RPC_ERROR => {
4250 if body.len() < 8 {
4251 return Err(InvocationError::Deserialize("rpc_error too short".into()));
4252 }
4253 let code = i32::from_le_bytes(body[4..8].try_into().unwrap());
4254 let message = tl_read_string(&body[8..]).unwrap_or_default();
4255 Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
4256 }
4257 ID_MSG_CONTAINER => {
4258 if body.len() < 8 {
4259 return Err(InvocationError::Deserialize("container too short".into()));
4260 }
4261 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
4262 let mut pos = 8usize;
4263 let mut payload: Option<Vec<u8>> = None;
4264 let mut updates_buf: Vec<update::Update> = Vec::new();
4265
4266 for _ in 0..count {
4267 if pos + 16 > body.len() { break; }
4268 let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
4269 pos += 16;
4270 if pos + inner_len > body.len() { break; }
4271 let inner = body[pos..pos + inner_len].to_vec();
4272 pos += inner_len;
4273 match unwrap_envelope(inner)? {
4274 EnvelopeResult::Payload(p) => { payload = Some(p); }
4275 EnvelopeResult::Updates(us) => { updates_buf.extend(us); }
4276 EnvelopeResult::None => {}
4277 }
4278 }
4279 if let Some(p) = payload {
4280 Ok(EnvelopeResult::Payload(p))
4281 } else if !updates_buf.is_empty() {
4282 Ok(EnvelopeResult::Updates(updates_buf))
4283 } else {
4284 Ok(EnvelopeResult::None)
4285 }
4286 }
4287 ID_GZIP_PACKED => {
4288 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
4289 unwrap_envelope(gz_inflate(&bytes)?)
4290 }
4291 ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY
4296 | 0xd33b5459 | 0x04deb57d | 0x8cc0d131 | 0x276d3ec6 | 0x809db6df | 0x7d861a08 | 0x0949d9dc | 0xae500895 | 0x9299359f | 0xe22045fc | 0x62d350c9 => {
4309 Ok(EnvelopeResult::None)
4310 }
4311 ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
4312 | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
4313 | ID_UPDATES_TOO_LONG => {
4314 Ok(EnvelopeResult::Updates(update::parse_updates(&body)))
4315 }
4316 _ => Ok(EnvelopeResult::Payload(body)),
4317 }
4318}
4319
4320fn random_i64() -> i64 {
4323 let mut b = [0u8; 8];
4324 getrandom::getrandom(&mut b).expect("getrandom");
4325 i64::from_le_bytes(b)
4326}
4327
4328fn jitter_delay(base_ms: u64) -> Duration {
4332 let mut b = [0u8; 2];
4334 getrandom::getrandom(&mut b).unwrap_or(());
4335 let rand_frac = u16::from_le_bytes(b) as f64 / 65535.0; let factor = 0.80 + rand_frac * 0.40; Duration::from_millis((base_ms as f64 * factor) as u64)
4338}
4339
4340fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
4341 if data.is_empty() { return Some(vec![]); }
4342 let (len, start) = if data[0] < 254 { (data[0] as usize, 1) }
4343 else if data.len() >= 4 {
4344 (data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16, 4)
4345 } else { return None; };
4346 if data.len() < start + len { return None; }
4347 Some(data[start..start + len].to_vec())
4348}
4349
4350fn tl_read_string(data: &[u8]) -> Option<String> {
4351 tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
4352}
4353
4354fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
4355 use std::io::Read;
4356 let mut out = Vec::new();
4357 if flate2::read::GzDecoder::new(data).read_to_end(&mut out).is_ok() && !out.is_empty() {
4358 return Ok(out);
4359 }
4360 out.clear();
4361 flate2::read::ZlibDecoder::new(data)
4362 .read_to_end(&mut out)
4363 .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
4364 Ok(out)
4365}
4366
4367const COMPRESSION_THRESHOLD: usize = 512;
4372
4373fn tl_write_bytes(data: &[u8]) -> Vec<u8> {
4375 let len = data.len();
4376 let mut out = Vec::with_capacity(4 + len);
4377 if len < 254 {
4378 out.push(len as u8);
4379 out.extend_from_slice(data);
4380 let pad = (4 - (1 + len) % 4) % 4;
4381 out.extend(std::iter::repeat(0u8).take(pad));
4382 } else {
4383 out.push(0xfe);
4384 out.extend_from_slice(&(len as u32).to_le_bytes()[..3]);
4385 out.extend_from_slice(data);
4386 let pad = (4 - (4 + len) % 4) % 4;
4387 out.extend(std::iter::repeat(0u8).take(pad));
4388 }
4389 out
4390}
4391
4392fn gz_pack_body(data: &[u8]) -> Vec<u8> {
4394 use std::io::Write;
4395 let mut enc = flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default());
4396 let _ = enc.write_all(data);
4397 let compressed = enc.finish().unwrap_or_default();
4398 let mut out = Vec::with_capacity(4 + 4 + compressed.len());
4399 out.extend_from_slice(&ID_GZIP_PACKED.to_le_bytes());
4400 out.extend(tl_write_bytes(&compressed));
4401 out
4402}
4403
4404fn maybe_gz_pack(data: &[u8]) -> Vec<u8> {
4407 if data.len() <= COMPRESSION_THRESHOLD {
4408 return data.to_vec();
4409 }
4410 let packed = gz_pack_body(data);
4411 if packed.len() < data.len() { packed } else { data.to_vec() }
4412}
4413
4414fn build_msgs_ack_body(msg_ids: &[i64]) -> Vec<u8> {
4418 let mut out = Vec::with_capacity(4 + 4 + 4 + msg_ids.len() * 8);
4421 out.extend_from_slice(&ID_MSGS_ACK.to_le_bytes());
4422 out.extend_from_slice(&0x1cb5c415_u32.to_le_bytes()); out.extend_from_slice(&(msg_ids.len() as u32).to_le_bytes());
4424 for &id in msg_ids {
4425 out.extend_from_slice(&id.to_le_bytes());
4426 }
4427 out
4428}
4429
4430fn build_container_body(messages: &[(i64, i32, &[u8])]) -> Vec<u8> {
4438 let total_body: usize = messages.iter().map(|(_, _, b)| 16 + b.len()).sum();
4439 let mut out = Vec::with_capacity(8 + total_body);
4440 out.extend_from_slice(&ID_MSG_CONTAINER.to_le_bytes());
4441 out.extend_from_slice(&(messages.len() as u32).to_le_bytes());
4442 for &(msg_id, seqno, body) in messages {
4443 out.extend_from_slice(&msg_id.to_le_bytes());
4444 out.extend_from_slice(&seqno.to_le_bytes());
4445 out.extend_from_slice(&(body.len() as u32).to_le_bytes());
4446 out.extend_from_slice(body);
4447 }
4448 out
4449}