1#![deny(unsafe_code)]
21
22mod errors;
23mod retry;
24mod session;
25mod transport;
26mod two_factor_auth;
27pub mod update;
28pub mod parsers;
29pub mod media;
30pub mod participants;
31pub mod pts;
32
33pub mod dc_pool;
35pub mod transport_obfuscated;
36pub mod transport_intermediate;
37pub mod socks5;
38pub mod session_backend;
39pub mod inline_iter;
40pub mod typing_guard;
41pub mod keyboard;
42
43#[macro_use]
44pub mod macros;
45
46pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
47pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
48pub use update::Update;
49pub use media::{UploadedFile, DownloadIter};
50pub use participants::Participant;
51pub use typing_guard::TypingGuard;
52pub use socks5::Socks5Config;
53pub use session_backend::{SessionBackend, BinaryFileBackend, InMemoryBackend};
54pub use keyboard::{Button, InlineKeyboard, ReplyKeyboard};
55
56pub use layer_tl_types as tl;
59
60use std::collections::HashMap;
61use std::collections::VecDeque;
62use std::num::NonZeroU32;
63use std::ops::ControlFlow;
64use std::sync::Arc;
65use std::time::Duration;
66
67use layer_mtproto::{EncryptedSession, Session, authentication as auth};
68use layer_tl_types::{Cursor, Deserializable, RemoteCall};
69use session::{DcEntry, PersistedSession};
70use tokio::io::{AsyncReadExt, AsyncWriteExt};
71use tokio::net::TcpStream;
72use tokio::sync::{mpsc, oneshot, Mutex};
73use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
74use tokio::time::sleep;
75use tokio_util::sync::CancellationToken;
76use socket2::TcpKeepalive;
77
78const ID_RPC_RESULT: u32 = 0xf35c6d01;
81const ID_RPC_ERROR: u32 = 0x2144ca19;
82const ID_MSG_CONTAINER: u32 = 0x73f1f8dc;
83const ID_GZIP_PACKED: u32 = 0x3072cfa1;
84const ID_PONG: u32 = 0x347773c5;
85const ID_MSGS_ACK: u32 = 0x62d6b459;
86const ID_BAD_SERVER_SALT: u32 = 0xedab447b;
87const ID_NEW_SESSION: u32 = 0x9ec20908;
88const ID_BAD_MSG_NOTIFY: u32 = 0xa7eff811;
89const ID_UPDATES: u32 = 0x74ae4240;
90const ID_UPDATE_SHORT: u32 = 0x78d4dec1;
91const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
92const ID_UPDATE_SHORT_MSG: u32 = 0x313bc7f8;
93const ID_UPDATE_SHORT_CHAT_MSG: u32 = 0x4d6deea5;
94const ID_UPDATES_TOO_LONG: u32 = 0xe317af7e;
95
96const PING_DELAY_SECS: u64 = 15;
102
103const NO_PING_DISCONNECT: i32 = 20;
106
107const RECONNECT_BASE_MS: u64 = 500;
109
110const RECONNECT_MAX_SECS: u64 = 5;
115
116const TCP_KEEPALIVE_IDLE_SECS: u64 = 10;
118const TCP_KEEPALIVE_INTERVAL_SECS: u64 = 5;
120const TCP_KEEPALIVE_PROBES: u32 = 3;
122
123#[derive(Default)]
128pub(crate) struct PeerCache {
129 pub(crate) users: HashMap<i64, i64>,
131 pub(crate) channels: HashMap<i64, i64>,
133}
134
135impl PeerCache {
136 fn cache_user(&mut self, user: &tl::enums::User) {
137 if let tl::enums::User::User(u) = user {
138 if let Some(hash) = u.access_hash {
139 self.users.insert(u.id, hash);
140 }
141 }
142 }
143
144 fn cache_chat(&mut self, chat: &tl::enums::Chat) {
145 match chat {
146 tl::enums::Chat::Channel(c) => {
147 if let Some(hash) = c.access_hash {
148 self.channels.insert(c.id, hash);
149 }
150 }
151 tl::enums::Chat::ChannelForbidden(c) => {
152 self.channels.insert(c.id, c.access_hash);
153 }
154 _ => {}
155 }
156 }
157
158 fn cache_users(&mut self, users: &[tl::enums::User]) {
159 for u in users { self.cache_user(u); }
160 }
161
162 fn cache_chats(&mut self, chats: &[tl::enums::Chat]) {
163 for c in chats { self.cache_chat(c); }
164 }
165
166 fn user_input_peer(&self, user_id: i64) -> tl::enums::InputPeer {
167 if user_id == 0 {
168 return tl::enums::InputPeer::PeerSelf;
169 }
170 let hash = self.users.get(&user_id).copied().unwrap_or(0);
171 tl::enums::InputPeer::User(tl::types::InputPeerUser { user_id, access_hash: hash })
172 }
173
174 fn channel_input_peer(&self, channel_id: i64) -> tl::enums::InputPeer {
175 let hash = self.channels.get(&channel_id).copied().unwrap_or(0);
176 tl::enums::InputPeer::Channel(tl::types::InputPeerChannel { channel_id, access_hash: hash })
177 }
178
179 fn peer_to_input(&self, peer: &tl::enums::Peer) -> tl::enums::InputPeer {
180 match peer {
181 tl::enums::Peer::User(u) => self.user_input_peer(u.user_id),
182 tl::enums::Peer::Chat(c) => tl::enums::InputPeer::Chat(
183 tl::types::InputPeerChat { chat_id: c.chat_id }
184 ),
185 tl::enums::Peer::Channel(c) => self.channel_input_peer(c.channel_id),
186 }
187 }
188}
189
190#[derive(Clone, Default)]
202pub struct InputMessage {
203 pub text: String,
204 pub reply_to: Option<i32>,
205 pub silent: bool,
206 pub background: bool,
207 pub clear_draft: bool,
208 pub no_webpage: bool,
209 pub entities: Option<Vec<tl::enums::MessageEntity>>,
210 pub reply_markup: Option<tl::enums::ReplyMarkup>,
211 pub schedule_date: Option<i32>,
212}
213
214impl InputMessage {
215 pub fn text(text: impl Into<String>) -> Self {
217 Self { text: text.into(), ..Default::default() }
218 }
219
220 pub fn set_text(mut self, text: impl Into<String>) -> Self {
222 self.text = text.into(); self
223 }
224
225 pub fn reply_to(mut self, id: Option<i32>) -> Self {
227 self.reply_to = id; self
228 }
229
230 pub fn silent(mut self, v: bool) -> Self {
232 self.silent = v; self
233 }
234
235 pub fn background(mut self, v: bool) -> Self {
237 self.background = v; self
238 }
239
240 pub fn clear_draft(mut self, v: bool) -> Self {
242 self.clear_draft = v; self
243 }
244
245 pub fn no_webpage(mut self, v: bool) -> Self {
247 self.no_webpage = v; self
248 }
249
250 pub fn entities(mut self, e: Vec<tl::enums::MessageEntity>) -> Self {
252 self.entities = Some(e); self
253 }
254
255 pub fn reply_markup(mut self, rm: tl::enums::ReplyMarkup) -> Self {
257 self.reply_markup = Some(rm); self
258 }
259
260 pub fn keyboard(mut self, kb: impl Into<tl::enums::ReplyMarkup>) -> Self {
270 self.reply_markup = Some(kb.into()); self
271 }
272
273 pub fn schedule_date(mut self, ts: Option<i32>) -> Self {
275 self.schedule_date = ts; self
276 }
277
278 fn reply_header(&self) -> Option<tl::enums::InputReplyTo> {
279 self.reply_to.map(|id| {
280 tl::enums::InputReplyTo::Message(
281 tl::types::InputReplyToMessage {
282 reply_to_msg_id: id,
283 top_msg_id: None,
284 reply_to_peer_id: None,
285 quote_text: None,
286 quote_entities: None,
287 quote_offset: None,
288 monoforum_peer_id: None,
289 todo_item_id: None,
290 }
291 )
292 })
293 }
294}
295
296impl From<&str> for InputMessage {
297 fn from(s: &str) -> Self { Self::text(s) }
298}
299
300impl From<String> for InputMessage {
301 fn from(s: String) -> Self { Self::text(s) }
302}
303
304#[derive(Clone, Debug, Default)]
315pub enum TransportKind {
316 #[default]
320 Abridged,
321 Intermediate,
325 Full,
329 Obfuscated { secret: Option<[u8; 16]> },
336}
337
338pub type ShutdownToken = CancellationToken;
358
359#[derive(Clone)]
361pub struct Config {
362 pub api_id: i32,
363 pub api_hash: String,
364 pub dc_addr: Option<String>,
365 pub retry_policy: Arc<dyn RetryPolicy>,
366 pub socks5: Option<crate::socks5::Socks5Config>,
368 pub allow_ipv6: bool,
370 pub transport: TransportKind,
372 pub session_backend: Arc<dyn crate::session_backend::SessionBackend>,
374 pub catch_up: bool,
378}
379
380impl Default for Config {
381 fn default() -> Self {
382 Self {
383 api_id: 0,
384 api_hash: String::new(),
385 dc_addr: None,
386 retry_policy: Arc::new(AutoSleep::default()),
387 socks5: None,
388 allow_ipv6: false,
389 transport: TransportKind::Abridged,
390 session_backend: Arc::new(crate::session_backend::BinaryFileBackend::new("layer.session")),
391 catch_up: false,
392 }
393 }
394}
395
396pub struct UpdateStream {
400 rx: mpsc::UnboundedReceiver<update::Update>,
401}
402
403impl UpdateStream {
404 pub async fn next(&mut self) -> Option<update::Update> {
406 self.rx.recv().await
407 }
408}
409
410#[derive(Debug, Clone)]
414pub struct Dialog {
415 pub raw: tl::enums::Dialog,
416 pub message: Option<tl::enums::Message>,
417 pub entity: Option<tl::enums::User>,
418 pub chat: Option<tl::enums::Chat>,
419}
420
421impl Dialog {
422 pub fn title(&self) -> String {
424 if let Some(tl::enums::User::User(u)) = &self.entity {
425 let first = u.first_name.as_deref().unwrap_or("");
426 let last = u.last_name.as_deref().unwrap_or("");
427 let name = format!("{first} {last}").trim().to_string();
428 if !name.is_empty() { return name; }
429 }
430 if let Some(chat) = &self.chat {
431 return match chat {
432 tl::enums::Chat::Chat(c) => c.title.clone(),
433 tl::enums::Chat::Forbidden(c) => c.title.clone(),
434 tl::enums::Chat::Channel(c) => c.title.clone(),
435 tl::enums::Chat::ChannelForbidden(c) => c.title.clone(),
436 tl::enums::Chat::Empty(_) => "(empty)".into(),
437 };
438 }
439 "(Unknown)".to_string()
440 }
441
442 pub fn peer(&self) -> Option<&tl::enums::Peer> {
444 match &self.raw {
445 tl::enums::Dialog::Dialog(d) => Some(&d.peer),
446 tl::enums::Dialog::Folder(_) => None,
447 }
448 }
449
450 pub fn unread_count(&self) -> i32 {
452 match &self.raw {
453 tl::enums::Dialog::Dialog(d) => d.unread_count,
454 _ => 0,
455 }
456 }
457
458 pub fn top_message(&self) -> i32 {
460 match &self.raw {
461 tl::enums::Dialog::Dialog(d) => d.top_message,
462 _ => 0,
463 }
464 }
465}
466
467struct ClientInner {
470 writer: Mutex<ConnectionWriter>,
474 pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>>,
478 reconnect_tx: mpsc::UnboundedSender<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
481 network_hint_tx: mpsc::UnboundedSender<()>,
484 #[allow(dead_code)]
486 shutdown_token: CancellationToken,
487 #[allow(dead_code)]
489 catch_up: bool,
490 home_dc_id: Mutex<i32>,
491 dc_options: Mutex<HashMap<i32, DcEntry>>,
492 pub(crate) peer_cache: Mutex<PeerCache>,
493 pub(crate) pts_state: Mutex<pts::PtsState>,
494 api_id: i32,
495 api_hash: String,
496 retry_policy: Arc<dyn RetryPolicy>,
497 socks5: Option<crate::socks5::Socks5Config>,
498 allow_ipv6: bool,
499 transport: TransportKind,
500 session_backend: Arc<dyn crate::session_backend::SessionBackend>,
501 dc_pool: Mutex<dc_pool::DcPool>,
502 update_tx: mpsc::UnboundedSender<update::Update>,
503}
504
505#[derive(Clone)]
507pub struct Client {
508 pub(crate) inner: Arc<ClientInner>,
509 _update_rx: Arc<Mutex<mpsc::UnboundedReceiver<update::Update>>>,
510}
511
512impl Client {
513 pub async fn connect(config: Config) -> Result<(Self, ShutdownToken), InvocationError> {
516 let (update_tx, update_rx) = mpsc::unbounded_channel();
517
518 let socks5 = config.socks5.clone();
520 let transport = config.transport.clone();
521
522 let (conn, home_dc_id, dc_opts) =
523 match config.session_backend.load()
524 .map_err(InvocationError::Io)?
525 {
526 Some(s) => {
527 if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
528 if let Some(key) = dc.auth_key {
529 log::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
530 match Connection::connect_with_key(
531 &dc.addr, key, dc.first_salt, dc.time_offset,
532 socks5.as_ref(), &transport,
533 ).await {
534 Ok(c) => {
535 let mut opts = session::default_dc_addresses()
536 .into_iter()
537 .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
538 .collect::<HashMap<_, _>>();
539 for d in &s.dcs { opts.insert(d.dc_id, d.clone()); }
540 (c, s.home_dc_id, opts)
541 }
542 Err(e) => {
543 log::warn!("[layer] Session connect failed ({e}), fresh connect …");
544 Self::fresh_connect(socks5.as_ref(), &transport).await?
545 }
546 }
547 } else {
548 Self::fresh_connect(socks5.as_ref(), &transport).await?
549 }
550 } else {
551 Self::fresh_connect(socks5.as_ref(), &transport).await?
552 }
553 }
554 None => Self::fresh_connect(socks5.as_ref(), &transport).await?,
555 };
556
557 let pool = dc_pool::DcPool::new(home_dc_id, &dc_opts.values().cloned().collect::<Vec<_>>());
559
560 let (writer, read_half, frame_kind) = conn.into_writer();
565 let auth_key = writer.enc.auth_key_bytes();
566 let session_id = writer.enc.session_id();
567
568 let pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>> =
569 Arc::new(Mutex::new(HashMap::new()));
570
571 let (reconnect_tx, reconnect_rx) =
573 mpsc::unbounded_channel::<(OwnedReadHalf, FrameKind, [u8; 256], i64)>();
574
575 let (network_hint_tx, network_hint_rx) = mpsc::unbounded_channel::<()>();
578
579
580 let shutdown_token = CancellationToken::new();
582 let catch_up = config.catch_up;
583
584 let inner = Arc::new(ClientInner {
585 writer: Mutex::new(writer),
586 pending: pending.clone(),
587 reconnect_tx,
588 network_hint_tx,
589 shutdown_token: shutdown_token.clone(),
590 catch_up,
591 home_dc_id: Mutex::new(home_dc_id),
592 dc_options: Mutex::new(dc_opts),
593 peer_cache: Mutex::new(PeerCache::default()),
594 pts_state: Mutex::new(pts::PtsState::default()),
595 api_id: config.api_id,
596 api_hash: config.api_hash,
597 retry_policy: config.retry_policy,
598 socks5: config.socks5,
599 allow_ipv6: config.allow_ipv6,
600 transport: config.transport,
601 session_backend: config.session_backend,
602 dc_pool: Mutex::new(pool),
603 update_tx: update_tx,
604 });
605
606 let client = Self {
607 inner,
608 _update_rx: Arc::new(Mutex::new(update_rx)),
609 };
610
611 {
614 let client_r = client.clone();
615 let shutdown_r = shutdown_token.clone();
616 tokio::spawn(async move {
617 client_r.run_reader_task(
618 read_half, frame_kind, auth_key, session_id,
619 reconnect_rx, network_hint_rx, shutdown_r,
620 ).await;
621 });
622 }
623
624 if let Err(e) = client.init_connection().await {
627 log::warn!("[layer] init_connection failed ({e}), retrying with fresh connect …");
628
629 let socks5_r = client.inner.socks5.clone();
630 let transport_r = client.inner.transport.clone();
631 let (new_conn, new_dc_id, new_opts) =
632 Self::fresh_connect(socks5_r.as_ref(), &transport_r).await?;
633
634 {
635 let mut dc_guard = client.inner.home_dc_id.lock().await;
636 *dc_guard = new_dc_id;
637 }
638 {
639 let mut opts_guard = client.inner.dc_options.lock().await;
640 *opts_guard = new_opts;
641 }
642
643 let (new_writer, new_read, new_fk) = new_conn.into_writer();
645 let new_ak = new_writer.enc.auth_key_bytes();
646 let new_sid = new_writer.enc.session_id();
647 *client.inner.writer.lock().await = new_writer;
648 let _ = client.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
649
650 client.init_connection().await?;
651 }
652
653 let _ = client.sync_pts_state().await;
654
655 if catch_up {
657 let c = client.clone();
658 let utx = client.inner.update_tx.clone();
659 tokio::spawn(async move {
660 if let Ok(missed) = c.get_difference().await {
661 for u in missed { let _ = utx.send(u); }
662 }
663 });
664 }
665
666 Ok((client, shutdown_token))
667 }
668
669 async fn fresh_connect(
670 socks5: Option<&crate::socks5::Socks5Config>,
671 transport: &TransportKind,
672 ) -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
673 log::info!("[layer] Fresh connect to DC2 …");
674 let conn = Connection::connect_raw("149.154.167.51:443", socks5, transport).await?;
675 let opts = session::default_dc_addresses()
676 .into_iter()
677 .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
678 .collect();
679 Ok((conn, 2, opts))
680 }
681
682 pub async fn save_session(&self) -> Result<(), InvocationError> {
685 let writer_guard = self.inner.writer.lock().await;
686 let home_dc_id = *self.inner.home_dc_id.lock().await;
687 let dc_options = self.inner.dc_options.lock().await;
688
689 let mut dcs: Vec<DcEntry> = dc_options.values().map(|e| DcEntry {
690 dc_id: e.dc_id,
691 addr: e.addr.clone(),
692 auth_key: if e.dc_id == home_dc_id { Some(writer_guard.auth_key_bytes()) } else { e.auth_key },
693 first_salt: if e.dc_id == home_dc_id { writer_guard.first_salt() } else { e.first_salt },
694 time_offset: if e.dc_id == home_dc_id { writer_guard.time_offset() } else { e.time_offset },
695 }).collect();
696 self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
698
699 self.inner.session_backend
700 .save(&PersistedSession { home_dc_id, dcs })
701 .map_err(InvocationError::Io)?;
702 log::info!("[layer] Session saved ✓");
703 Ok(())
704 }
705
706 pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
710 match self.invoke(&tl::functions::updates::GetState {}).await {
711 Ok(_) => Ok(true),
712 Err(e) if e.is("AUTH_KEY_UNREGISTERED")
713 || matches!(&e, InvocationError::Rpc(r) if r.code == 401) => Ok(false),
714 Err(e) => Err(e),
715 }
716 }
717
718 pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
720 let req = tl::functions::auth::ImportBotAuthorization {
721 flags: 0, api_id: self.inner.api_id,
722 api_hash: self.inner.api_hash.clone(),
723 bot_auth_token: token.to_string(),
724 };
725
726 let result = match self.invoke(&req).await {
727 Ok(x) => x,
728 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
729 let dc_id = r.value.unwrap_or(2) as i32;
730 self.migrate_to(dc_id).await?;
731 self.invoke(&req).await?
732 }
733 Err(e) => return Err(e),
734 };
735
736 let name = match result {
737 tl::enums::auth::Authorization::Authorization(a) => {
738 self.cache_user(&a.user).await;
739 Self::extract_user_name(&a.user)
740 }
741 tl::enums::auth::Authorization::SignUpRequired(_) => {
742 panic!("unexpected SignUpRequired during bot sign-in")
743 }
744 };
745 log::info!("[layer] Bot signed in ✓ ({name})");
746 Ok(name)
747 }
748
749 pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
751 use tl::enums::auth::SentCode;
752
753 let req = self.make_send_code_req(phone);
754 let body = match self.rpc_call_raw(&req).await {
755 Ok(b) => b,
756 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
757 let dc_id = r.value.unwrap_or(2) as i32;
758 self.migrate_to(dc_id).await?;
759 self.rpc_call_raw(&req).await?
760 }
761 Err(e) => return Err(e),
762 };
763
764 let mut cur = Cursor::from_slice(&body);
765 let hash = match tl::enums::auth::SentCode::deserialize(&mut cur)? {
766 SentCode::SentCode(s) => s.phone_code_hash,
767 SentCode::Success(_) => return Err(InvocationError::Deserialize("unexpected Success".into())),
768 SentCode::PaymentRequired(_) => return Err(InvocationError::Deserialize("payment required to send code".into())),
769 };
770 log::info!("[layer] Login code sent");
771 Ok(LoginToken { phone: phone.to_string(), phone_code_hash: hash })
772 }
773
774 pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
776 let req = tl::functions::auth::SignIn {
777 phone_number: token.phone.clone(),
778 phone_code_hash: token.phone_code_hash.clone(),
779 phone_code: Some(code.trim().to_string()),
780 email_verification: None,
781 };
782
783 let body = match self.rpc_call_raw(&req).await {
784 Ok(b) => b,
785 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
786 let dc_id = r.value.unwrap_or(2) as i32;
787 self.migrate_to(dc_id).await.map_err(SignInError::Other)?;
788 self.rpc_call_raw(&req).await.map_err(SignInError::Other)?
789 }
790 Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
791 let t = self.get_password_info().await.map_err(SignInError::Other)?;
792 return Err(SignInError::PasswordRequired(t));
793 }
794 Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
795 Err(e) => return Err(SignInError::Other(e)),
796 };
797
798 let mut cur = Cursor::from_slice(&body);
799 match tl::enums::auth::Authorization::deserialize(&mut cur)
800 .map_err(|e| SignInError::Other(e.into()))?
801 {
802 tl::enums::auth::Authorization::Authorization(a) => {
803 self.cache_user(&a.user).await;
804 let name = Self::extract_user_name(&a.user);
805 log::info!("[layer] Signed in ✓ Welcome, {name}!");
806 Ok(name)
807 }
808 tl::enums::auth::Authorization::SignUpRequired(_) => Err(SignInError::SignUpRequired),
809 }
810 }
811
812 pub async fn check_password(
814 &self,
815 token: PasswordToken,
816 password: impl AsRef<[u8]>,
817 ) -> Result<String, InvocationError> {
818 let pw = token.password;
819 let algo = pw.current_algo.ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
820 let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
821 let g_b = pw.srp_b.ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
822 let a = pw.secure_random;
823 let srp_id = pw.srp_id.ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
824
825 let (m1, g_a) = two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
826 let req = tl::functions::auth::CheckPassword {
827 password: tl::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
828 tl::types::InputCheckPasswordSrp {
829 srp_id, a: g_a.to_vec(), m1: m1.to_vec(),
830 },
831 ),
832 };
833
834 let body = self.rpc_call_raw(&req).await?;
835 let mut cur = Cursor::from_slice(&body);
836 match tl::enums::auth::Authorization::deserialize(&mut cur)? {
837 tl::enums::auth::Authorization::Authorization(a) => {
838 self.cache_user(&a.user).await;
839 let name = Self::extract_user_name(&a.user);
840 log::info!("[layer] 2FA ✓ Welcome, {name}!");
841 Ok(name)
842 }
843 tl::enums::auth::Authorization::SignUpRequired(_) =>
844 Err(InvocationError::Deserialize("unexpected SignUpRequired after 2FA".into())),
845 }
846 }
847
848 pub async fn sign_out(&self) -> Result<bool, InvocationError> {
850 let req = tl::functions::auth::LogOut {};
851 match self.rpc_call_raw(&req).await {
852 Ok(_) => { log::info!("[layer] Signed out ✓"); Ok(true) }
853 Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
854 Err(e) => Err(e),
855 }
856 }
857
858 pub async fn get_me(&self) -> Result<tl::types::User, InvocationError> {
862 let req = tl::functions::users::GetUsers {
863 id: vec![tl::enums::InputUser::UserSelf],
864 };
865 let body = self.rpc_call_raw(&req).await?;
866 let mut cur = Cursor::from_slice(&body);
867 let users = Vec::<tl::enums::User>::deserialize(&mut cur)?;
868 self.cache_users_slice(&users).await;
869 users.into_iter().find_map(|u| match u {
870 tl::enums::User::User(u) => Some(u),
871 _ => None,
872 }).ok_or_else(|| InvocationError::Deserialize("getUsers returned no user".into()))
873 }
874
875 pub fn stream_updates(&self) -> UpdateStream {
883 let (caller_tx, rx) = mpsc::unbounded_channel::<update::Update>();
884 let internal_rx = self._update_rx.clone();
887 tokio::spawn(async move {
888 let mut guard = internal_rx.lock().await;
890 while let Some(upd) = guard.recv().await {
891 if caller_tx.send(upd).is_err() { break; }
892 }
893 });
894 UpdateStream { rx }
895 }
896
897 pub fn signal_network_restored(&self) {
910 let _ = self.inner.network_hint_tx.send(());
911 }
912
913 async fn run_reader_task(
947 &self,
948 read_half: OwnedReadHalf,
949 frame_kind: FrameKind,
950 auth_key: [u8; 256],
951 session_id: i64,
952 mut new_conn_rx: mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
953 mut network_hint_rx: mpsc::UnboundedReceiver<()>,
954 shutdown_token: CancellationToken,
955 ) {
956 let mut rh = read_half;
957 let mut fk = frame_kind;
958 let mut ak = auth_key;
959 let mut sid = session_id;
960 let mut restart_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = None;
963 let mut restart_count: u32 = 0;
964
965 loop {
966 tokio::select! {
967 _ = shutdown_token.cancelled() => {
969 log::info!("[layer] Reader task: shutdown requested, exiting cleanly.");
970 let mut pending = self.inner.pending.lock().await;
971 for (_, tx) in pending.drain() {
972 let _ = tx.send(Err(InvocationError::Dropped));
973 }
974 return;
975 }
976
977 _ = self.reader_loop(
979 rh, fk, ak, sid,
980 restart_init_rx.take(),
981 &mut new_conn_rx, &mut network_hint_rx,
982 ) => {}
983 }
984
985 if shutdown_token.is_cancelled() {
988 log::info!("[layer] Reader task: exiting after loop (shutdown).");
989 return;
990 }
991
992 restart_count += 1;
993 log::error!(
994 "[layer] Reader loop exited unexpectedly (restart #{restart_count}) — supervisor reconnecting …"
995 );
996
997 {
999 let mut pending = self.inner.pending.lock().await;
1000 for (_, tx) in pending.drain() {
1001 let _ = tx.send(Err(InvocationError::Io(std::io::Error::new(
1002 std::io::ErrorKind::ConnectionReset,
1003 "reader task restarted",
1004 ))));
1005 }
1006 }
1007
1008 let mut delay_ms = RECONNECT_BASE_MS;
1010 let new_conn = loop {
1011 log::info!("[layer] Supervisor: reconnecting in {delay_ms} ms …");
1012 tokio::select! {
1013 _ = shutdown_token.cancelled() => {
1014 log::info!("[layer] Supervisor: shutdown during reconnect, exiting.");
1015 return;
1016 }
1017 _ = sleep(Duration::from_millis(delay_ms)) => {}
1018 }
1019
1020 let dummy_ak = [0u8; 256];
1025 let dummy_fk = FrameKind::Abridged;
1026 match self.do_reconnect(&dummy_ak, &dummy_fk).await {
1027 Ok(conn) => break conn,
1028 Err(e) => {
1029 log::warn!("[layer] Supervisor: reconnect failed ({e})");
1030 let next = (delay_ms * 2).min(RECONNECT_MAX_SECS * 1_000);
1031 delay_ms = jitter_delay(next).as_millis() as u64;
1032 }
1033 }
1034 };
1035
1036 let (new_rh, new_fk, new_ak, new_sid) = new_conn;
1037 rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
1038
1039 let (init_tx, init_rx) = oneshot::channel();
1042 let c = self.clone();
1043 let utx = self.inner.update_tx.clone();
1044 tokio::spawn(async move {
1045 let result = loop {
1047 match c.init_connection().await {
1048 Ok(()) => break Ok(()),
1049 Err(InvocationError::Rpc(ref r))
1050 if r.flood_wait_seconds().is_some() =>
1051 {
1052 let secs = r.flood_wait_seconds().unwrap();
1053 log::warn!(
1054 "[layer] Supervisor init_connection FLOOD_WAIT_{secs} — waiting"
1055 );
1056 sleep(Duration::from_secs(secs + 1)).await;
1057 }
1058 Err(e) => break Err(e),
1059 }
1060 };
1061 if result.is_ok() {
1062 if let Ok(missed) = c.get_difference().await {
1063 for u in missed { let _ = utx.send(u); }
1064 }
1065 }
1066 let _ = init_tx.send(result);
1067 });
1068 restart_init_rx = Some(init_rx);
1069
1070 log::info!("[layer] Supervisor: restarting reader loop (restart #{restart_count}) …");
1071 }
1073 }
1074
1075 async fn reader_loop(
1076 &self,
1077 mut rh: OwnedReadHalf,
1078 mut fk: FrameKind,
1079 mut ak: [u8; 256],
1080 mut sid: i64,
1081 initial_init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>>,
1084 new_conn_rx: &mut mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
1085 network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
1086 ) {
1087 let mut init_rx: Option<oneshot::Receiver<Result<(), InvocationError>>> = initial_init_rx;
1092 let mut init_fail_count: u32 = 0;
1097
1098 loop {
1099 tokio::select! {
1100 outcome = recv_frame_with_keepalive(&mut rh, &fk, self, &ak) => {
1102 match outcome {
1103 FrameOutcome::Frame(mut raw) => {
1104 let msg = match EncryptedSession::decrypt_frame(&ak, sid, &mut raw) {
1105 Ok(m) => m,
1106 Err(e) => { log::warn!("[layer] Decrypt error: {e:?}"); continue; }
1107 };
1108 if msg.salt != 0 {
1109 self.inner.writer.lock().await.enc.salt = msg.salt;
1110 }
1111 self.route_frame(msg.body).await;
1112 }
1113
1114 FrameOutcome::Error(e) => {
1115 log::warn!("[layer] Reader: connection error: {e}");
1116 drop(init_rx.take()); {
1121 let mut pending = self.inner.pending.lock().await;
1122 let msg = e.to_string();
1123 for (_, tx) in pending.drain() {
1124 let _ = tx.send(Err(InvocationError::Io(
1125 std::io::Error::new(
1126 std::io::ErrorKind::ConnectionReset, msg.clone()))));
1127 }
1128 }
1129
1130 match self.do_reconnect_loop(
1131 RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
1132 network_hint_rx,
1133 ).await {
1134 Some(rx) => { init_rx = Some(rx); }
1135 None => return, }
1137 }
1138
1139 FrameOutcome::Keepalive => {} }
1141 }
1142
1143 maybe = new_conn_rx.recv() => {
1145 if let Some((new_rh, new_fk, new_ak, new_sid)) = maybe {
1146 rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
1147 log::info!("[layer] Reader: switched to new connection.");
1148 } else {
1149 break; }
1151 }
1152
1153
1154 init_result = async { init_rx.as_mut().unwrap().await }, if init_rx.is_some() => {
1156 init_rx = None;
1157 match init_result {
1158 Ok(Ok(())) => {
1159 init_fail_count = 0;
1160 log::info!("[layer] Reconnected to Telegram ✓ — session live, replaying missed updates …");
1165 for attempt in 1u8..=3 {
1166 match self.save_session().await {
1167 Ok(()) => break,
1168 Err(e) if attempt < 3 => {
1169 log::warn!(
1170 "[layer] save_session failed (attempt {attempt}/3): {e}"
1171 );
1172 sleep(Duration::from_millis(500)).await;
1173 }
1174 Err(e) => {
1175 log::error!(
1176 "[layer] save_session permanently failed after 3 attempts: {e}"
1177 );
1178 }
1179 }
1180 }
1181 }
1182
1183 Ok(Err(e)) => {
1184 let key_is_stale = match &e {
1193 InvocationError::Rpc(r) if r.code == -404 => true,
1195 InvocationError::Io(io) if io.kind() == std::io::ErrorKind::UnexpectedEof
1197 || io.kind() == std::io::ErrorKind::ConnectionReset => true,
1198 _ => false,
1200 };
1201
1202 if key_is_stale {
1203 log::warn!(
1204 "[layer] init_connection failed with definitive bad-key signal ({e}) \
1205 — clearing auth key for fresh DH …"
1206 );
1207 init_fail_count = 0;
1208 let home_dc_id = *self.inner.home_dc_id.lock().await;
1209 let mut opts = self.inner.dc_options.lock().await;
1210 if let Some(entry) = opts.get_mut(&home_dc_id) {
1211 entry.auth_key = None;
1212 }
1213 } else {
1214 init_fail_count += 1;
1215 log::warn!(
1216 "[layer] init_connection failed transiently (attempt {init_fail_count}, {e}) \
1217 — retrying with same key …"
1218 );
1219 }
1220 {
1221 let mut pending = self.inner.pending.lock().await;
1222 let msg = e.to_string();
1223 for (_, tx) in pending.drain() {
1224 let _ = tx.send(Err(InvocationError::Io(
1225 std::io::Error::new(
1226 std::io::ErrorKind::ConnectionReset, msg.clone()))));
1227 }
1228 }
1229 match self.do_reconnect_loop(
1230 0, &mut rh, &mut fk, &mut ak, &mut sid, network_hint_rx,
1231 ).await {
1232 Some(rx) => { init_rx = Some(rx); }
1233 None => return,
1234 }
1235 }
1236
1237 Err(_) => {
1238 log::warn!("[layer] init_connection task dropped unexpectedly, reconnecting …");
1240 match self.do_reconnect_loop(
1241 RECONNECT_BASE_MS, &mut rh, &mut fk, &mut ak, &mut sid,
1242 network_hint_rx,
1243 ).await {
1244 Some(rx) => { init_rx = Some(rx); }
1245 None => return,
1246 }
1247 }
1248 }
1249 }
1250 }
1251 }
1252 }
1253
1254 async fn route_frame(&self, body: Vec<u8>) {
1256 if body.len() < 4 { return; }
1257 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
1258
1259 match cid {
1260 ID_RPC_RESULT => {
1261 if body.len() < 12 { return; }
1263 let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1264 let inner = body[12..].to_vec();
1265 let result = unwrap_envelope(inner);
1267 if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
1268 let to_send = match result {
1269 Ok(EnvelopeResult::Payload(p)) => Ok(p),
1270 Ok(EnvelopeResult::Updates(us)) => {
1271 for u in us { let _ = self.inner.update_tx.send(u); }
1273 Ok(vec![])
1274 }
1275 Ok(EnvelopeResult::None) => Ok(vec![]),
1276 Err(e) => Err(e),
1277 };
1278 let _ = tx.send(to_send);
1279 }
1280 }
1281 ID_RPC_ERROR => {
1285 log::warn!("[layer] Unexpected top-level rpc_error (no pending target)");
1286 }
1287 ID_MSG_CONTAINER => {
1288 if body.len() < 8 { return; }
1290 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
1291 let mut pos = 8usize;
1292 for _ in 0..count {
1293 if pos + 16 > body.len() { break; }
1294 let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
1295 pos += 16;
1296 if pos + inner_len > body.len() { break; }
1297 let inner = body[pos..pos + inner_len].to_vec();
1298 pos += inner_len;
1299 Box::pin(self.route_frame(inner)).await;
1300 }
1301 }
1302 ID_GZIP_PACKED => {
1303 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
1304 if let Ok(inflated) = gz_inflate(&bytes) {
1305 Box::pin(self.route_frame(inflated)).await;
1306 }
1307 }
1308 ID_BAD_SERVER_SALT => {
1309 if body.len() >= 16 {
1311 let new_salt = i64::from_le_bytes(body[8..16].try_into().unwrap());
1312 self.inner.writer.lock().await.enc.salt = new_salt;
1313 }
1314 }
1315 ID_PONG => {
1316 if body.len() >= 20 {
1320 let ping_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
1321 if let Some(tx) = self.inner.pending.lock().await.remove(&ping_msg_id) {
1322 let _ = tx.send(Ok(body));
1323 }
1324 }
1325 }
1326 ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
1327 | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
1328 | ID_UPDATES_TOO_LONG => {
1329 for u in update::parse_updates(&body) {
1330 let _ = self.inner.update_tx.send(u);
1331 }
1332 }
1333 _ => {}
1335 }
1336 }
1337
1338 async fn do_reconnect_loop(
1348 &self,
1349 initial_delay_ms: u64,
1350 rh: &mut OwnedReadHalf,
1351 fk: &mut FrameKind,
1352 ak: &mut [u8; 256],
1353 sid: &mut i64,
1354 network_hint_rx: &mut mpsc::UnboundedReceiver<()>,
1355 ) -> Option<oneshot::Receiver<Result<(), InvocationError>>> {
1356 let mut delay_ms = if initial_delay_ms == 0 {
1357 0
1360 } else {
1361 initial_delay_ms.max(RECONNECT_BASE_MS)
1362 };
1363 loop {
1364 log::info!("[layer] Reconnecting in {delay_ms} ms …");
1365 tokio::select! {
1366 _ = sleep(Duration::from_millis(delay_ms)) => {}
1367 hint = network_hint_rx.recv() => {
1368 if hint.is_none() { return None; } log::info!("[layer] Network hint → skipping backoff, reconnecting now");
1370 }
1371 }
1372
1373 match self.do_reconnect(ak, fk).await {
1374 Ok((new_rh, new_fk, new_ak, new_sid)) => {
1375 *rh = new_rh; *fk = new_fk; *ak = new_ak; *sid = new_sid;
1376 log::info!("[layer] TCP reconnected ✓ — initialising session …");
1377
1378 let (init_tx, init_rx) = oneshot::channel();
1382 let c = self.clone();
1383 let utx = self.inner.update_tx.clone();
1384 tokio::spawn(async move {
1385 let result = loop {
1390 match c.init_connection().await {
1391 Ok(()) => break Ok(()),
1392 Err(InvocationError::Rpc(ref r))
1393 if r.flood_wait_seconds().is_some() =>
1394 {
1395 let secs = r.flood_wait_seconds().unwrap();
1396 log::warn!(
1397 "[layer] init_connection FLOOD_WAIT_{secs} — waiting before retry"
1398 );
1399 sleep(Duration::from_secs(secs + 1)).await;
1400 }
1402 Err(e) => break Err(e),
1403 }
1404 };
1405 if result.is_ok() {
1406 if let Ok(missed) = c.get_difference().await {
1408 for u in missed { let _ = utx.send(u); }
1409 }
1410 }
1411 let _ = init_tx.send(result);
1412 });
1413 return Some(init_rx);
1414 }
1415 Err(e) => {
1416 log::warn!("[layer] Reconnect attempt failed: {e}");
1417 let next = (delay_ms.saturating_mul(2).max(RECONNECT_BASE_MS))
1421 .min(RECONNECT_MAX_SECS * 1_000);
1422 delay_ms = jitter_delay(next).as_millis() as u64;
1423 }
1424 }
1425 }
1426 }
1427
1428 async fn do_reconnect(
1430 &self,
1431 _old_auth_key: &[u8; 256],
1432 _old_frame_kind: &FrameKind,
1433 ) -> Result<(OwnedReadHalf, FrameKind, [u8; 256], i64), InvocationError> {
1434 let home_dc_id = *self.inner.home_dc_id.lock().await;
1435 let (addr, saved_key, first_salt, time_offset) = {
1436 let opts = self.inner.dc_options.lock().await;
1437 match opts.get(&home_dc_id) {
1438 Some(e) => (e.addr.clone(), e.auth_key, e.first_salt, e.time_offset),
1439 None => ("149.154.167.51:443".to_string(), None, 0, 0),
1440 }
1441 };
1442 let socks5 = self.inner.socks5.clone();
1443 let transport = self.inner.transport.clone();
1444
1445 let new_conn = if let Some(key) = saved_key {
1446 log::info!("[layer] Reconnecting to DC{home_dc_id} with saved key …");
1447 match Connection::connect_with_key(
1448 &addr, key, first_salt, time_offset, socks5.as_ref(), &transport,
1449 ).await {
1450 Ok(c) => c,
1451 Err(e2) => {
1452 log::warn!("[layer] connect_with_key failed ({e2}), fresh DH …");
1453 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
1454 }
1455 }
1456 } else {
1457 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
1458 };
1459
1460 let (new_writer, new_read, new_fk) = new_conn.into_writer();
1461 let new_ak = new_writer.enc.auth_key_bytes();
1462 let new_sid = new_writer.enc.session_id();
1463 *self.inner.writer.lock().await = new_writer;
1464
1465 Ok((new_read, new_fk, new_ak, new_sid))
1477 }
1478
1479 pub async fn send_message(&self, peer: &str, text: &str) -> Result<(), InvocationError> {
1483 let p = self.resolve_peer(peer).await?;
1484 self.send_message_to_peer(p, text).await
1485 }
1486
1487 pub async fn send_message_to_peer(
1489 &self,
1490 peer: tl::enums::Peer,
1491 text: &str,
1492 ) -> Result<(), InvocationError> {
1493 self.send_message_to_peer_ex(peer, &InputMessage::text(text)).await
1494 }
1495
1496 pub async fn send_message_to_peer_ex(
1498 &self,
1499 peer: tl::enums::Peer,
1500 msg: &InputMessage,
1501 ) -> Result<(), InvocationError> {
1502 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1503 let req = tl::functions::messages::SendMessage {
1504 no_webpage: msg.no_webpage,
1505 silent: msg.silent,
1506 background: msg.background,
1507 clear_draft: msg.clear_draft,
1508 noforwards: false,
1509 update_stickersets_order: false,
1510 invert_media: false,
1511 allow_paid_floodskip: false,
1512 peer: input_peer,
1513 reply_to: msg.reply_header(),
1514 message: msg.text.clone(),
1515 random_id: random_i64(),
1516 reply_markup: msg.reply_markup.clone(),
1517 entities: msg.entities.clone(),
1518 schedule_date: msg.schedule_date,
1519 schedule_repeat_period: None,
1520 send_as: None,
1521 quick_reply_shortcut: None,
1522 effect: None,
1523 allow_paid_stars: None,
1524 suggested_post: None,
1525 };
1526 self.rpc_write(&req).await
1527 }
1528
1529 pub async fn send_to_self(&self, text: &str) -> Result<(), InvocationError> {
1531 let req = tl::functions::messages::SendMessage {
1532 no_webpage: false,
1533 silent: false,
1534 background: false,
1535 clear_draft: false,
1536 noforwards: false,
1537 update_stickersets_order: false,
1538 invert_media: false,
1539 allow_paid_floodskip: false,
1540 peer: tl::enums::InputPeer::PeerSelf,
1541 reply_to: None,
1542 message: text.to_string(),
1543 random_id: random_i64(),
1544 reply_markup: None,
1545 entities: None,
1546 schedule_date: None,
1547 schedule_repeat_period: None,
1548 send_as: None,
1549 quick_reply_shortcut: None,
1550 effect: None,
1551 allow_paid_stars: None,
1552 suggested_post: None,
1553 };
1554 self.rpc_write(&req).await
1555 }
1556
1557 pub async fn edit_message(
1559 &self,
1560 peer: tl::enums::Peer,
1561 message_id: i32,
1562 new_text: &str,
1563 ) -> Result<(), InvocationError> {
1564 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1565 let req = tl::functions::messages::EditMessage {
1566 no_webpage: false,
1567 invert_media: false,
1568 peer: input_peer,
1569 id: message_id,
1570 message: Some(new_text.to_string()),
1571 media: None,
1572 reply_markup: None,
1573 entities: None,
1574 schedule_date: None,
1575 schedule_repeat_period: None,
1576 quick_reply_shortcut_id: None,
1577 };
1578 self.rpc_write(&req).await
1579 }
1580
1581 pub async fn forward_messages(
1583 &self,
1584 destination: tl::enums::Peer,
1585 message_ids: &[i32],
1586 source: tl::enums::Peer,
1587 ) -> Result<(), InvocationError> {
1588 let cache = self.inner.peer_cache.lock().await;
1589 let to_peer = cache.peer_to_input(&destination);
1590 let from_peer = cache.peer_to_input(&source);
1591 drop(cache);
1592
1593 let req = tl::functions::messages::ForwardMessages {
1594 silent: false,
1595 background: false,
1596 with_my_score: false,
1597 drop_author: false,
1598 drop_media_captions: false,
1599 noforwards: false,
1600 from_peer: from_peer,
1601 id: message_ids.to_vec(),
1602 random_id: (0..message_ids.len()).map(|_| random_i64()).collect(),
1603 to_peer: to_peer,
1604 top_msg_id: None,
1605 reply_to: None,
1606 schedule_date: None,
1607 schedule_repeat_period: None,
1608 send_as: None,
1609 quick_reply_shortcut: None,
1610 effect: None,
1611 video_timestamp: None,
1612 allow_paid_stars: None,
1613 allow_paid_floodskip: false,
1614 suggested_post: None,
1615 };
1616 self.rpc_write(&req).await
1617 }
1618
1619 pub async fn delete_messages(&self, message_ids: Vec<i32>, revoke: bool) -> Result<(), InvocationError> {
1621 let req = tl::functions::messages::DeleteMessages { revoke, id: message_ids };
1622 self.rpc_write(&req).await
1623 }
1624
1625 pub async fn get_messages_by_id(
1627 &self,
1628 peer: tl::enums::Peer,
1629 ids: &[i32],
1630 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1631 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1632 let id_list: Vec<tl::enums::InputMessage> = ids.iter()
1633 .map(|&id| tl::enums::InputMessage::Id(tl::types::InputMessageId { id }))
1634 .collect();
1635 let req = tl::functions::channels::GetMessages {
1636 channel: match &input_peer {
1637 tl::enums::InputPeer::Channel(c) =>
1638 tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
1639 channel_id: c.channel_id, access_hash: c.access_hash
1640 }),
1641 _ => return self.get_messages_user(input_peer, id_list).await,
1642 },
1643 id: id_list,
1644 };
1645 let body = self.rpc_call_raw(&req).await?;
1646 let mut cur = Cursor::from_slice(&body);
1647 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1648 tl::enums::messages::Messages::Messages(m) => m.messages,
1649 tl::enums::messages::Messages::Slice(m) => m.messages,
1650 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1651 tl::enums::messages::Messages::NotModified(_) => vec![],
1652 };
1653 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1654 }
1655
1656 async fn get_messages_user(
1657 &self,
1658 _peer: tl::enums::InputPeer,
1659 ids: Vec<tl::enums::InputMessage>,
1660 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1661 let req = tl::functions::messages::GetMessages { id: ids };
1662 let body = self.rpc_call_raw(&req).await?;
1663 let mut cur = Cursor::from_slice(&body);
1664 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1665 tl::enums::messages::Messages::Messages(m) => m.messages,
1666 tl::enums::messages::Messages::Slice(m) => m.messages,
1667 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1668 tl::enums::messages::Messages::NotModified(_) => vec![],
1669 };
1670 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1671 }
1672
1673 pub async fn get_pinned_message(
1675 &self,
1676 peer: tl::enums::Peer,
1677 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
1678 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1679 let req = tl::functions::messages::Search {
1680 peer: input_peer,
1681 q: String::new(),
1682 from_id: None,
1683 saved_peer_id: None,
1684 saved_reaction: None,
1685 top_msg_id: None,
1686 filter: tl::enums::MessagesFilter::InputMessagesFilterPinned,
1687 min_date: 0,
1688 max_date: 0,
1689 offset_id: 0,
1690 add_offset: 0,
1691 limit: 1,
1692 max_id: 0,
1693 min_id: 0,
1694 hash: 0,
1695 };
1696 let body = self.rpc_call_raw(&req).await?;
1697 let mut cur = Cursor::from_slice(&body);
1698 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1699 tl::enums::messages::Messages::Messages(m) => m.messages,
1700 tl::enums::messages::Messages::Slice(m) => m.messages,
1701 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1702 tl::enums::messages::Messages::NotModified(_) => vec![],
1703 };
1704 Ok(msgs.into_iter().next().map(update::IncomingMessage::from_raw))
1705 }
1706
1707 pub async fn pin_message(
1709 &self,
1710 peer: tl::enums::Peer,
1711 message_id: i32,
1712 silent: bool,
1713 unpin: bool,
1714 pm_oneside: bool,
1715 ) -> Result<(), InvocationError> {
1716 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1717 let req = tl::functions::messages::UpdatePinnedMessage {
1718 silent,
1719 unpin,
1720 pm_oneside,
1721 peer: input_peer,
1722 id: message_id,
1723 };
1724 self.rpc_write(&req).await
1725 }
1726
1727 pub async fn unpin_message(
1729 &self,
1730 peer: tl::enums::Peer,
1731 message_id: i32,
1732 ) -> Result<(), InvocationError> {
1733 self.pin_message(peer, message_id, true, true, false).await
1734 }
1735
1736 pub async fn unpin_all_messages(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1738 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1739 let req = tl::functions::messages::UnpinAllMessages {
1740 peer: input_peer,
1741 top_msg_id: None,
1742 saved_peer_id: None,
1743 };
1744 self.rpc_write(&req).await
1745 }
1746
1747 pub async fn search_messages(
1751 &self,
1752 peer: tl::enums::Peer,
1753 query: &str,
1754 limit: i32,
1755 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1756 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1757 let req = tl::functions::messages::Search {
1758 peer: input_peer,
1759 q: query.to_string(),
1760 from_id: None,
1761 saved_peer_id: None,
1762 saved_reaction: None,
1763 top_msg_id: None,
1764 filter: tl::enums::MessagesFilter::InputMessagesFilterEmpty,
1765 min_date: 0,
1766 max_date: 0,
1767 offset_id: 0,
1768 add_offset: 0,
1769 limit,
1770 max_id: 0,
1771 min_id: 0,
1772 hash: 0,
1773 };
1774 let body = self.rpc_call_raw(&req).await?;
1775 let mut cur = Cursor::from_slice(&body);
1776 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1777 tl::enums::messages::Messages::Messages(m) => m.messages,
1778 tl::enums::messages::Messages::Slice(m) => m.messages,
1779 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1780 tl::enums::messages::Messages::NotModified(_) => vec![],
1781 };
1782 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1783 }
1784
1785 pub async fn search_global(
1787 &self,
1788 query: &str,
1789 limit: i32,
1790 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1791 let req = tl::functions::messages::SearchGlobal {
1792 broadcasts_only: false,
1793 groups_only: false,
1794 users_only: false,
1795 folder_id: None,
1796 q: query.to_string(),
1797 filter: tl::enums::MessagesFilter::InputMessagesFilterEmpty,
1798 min_date: 0,
1799 max_date: 0,
1800 offset_rate: 0,
1801 offset_peer: tl::enums::InputPeer::Empty,
1802 offset_id: 0,
1803 limit,
1804 };
1805 let body = self.rpc_call_raw(&req).await?;
1806 let mut cur = Cursor::from_slice(&body);
1807 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1808 tl::enums::messages::Messages::Messages(m) => m.messages,
1809 tl::enums::messages::Messages::Slice(m) => m.messages,
1810 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1811 tl::enums::messages::Messages::NotModified(_) => vec![],
1812 };
1813 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1814 }
1815
1816 pub async fn get_scheduled_messages(
1833 &self,
1834 peer: tl::enums::Peer,
1835 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1836 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1837 let req = tl::functions::messages::GetScheduledHistory {
1838 peer: input_peer,
1839 hash: 0,
1840 };
1841 let body = self.rpc_call_raw(&req).await?;
1842 let mut cur = Cursor::from_slice(&body);
1843 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1844 tl::enums::messages::Messages::Messages(m) => m.messages,
1845 tl::enums::messages::Messages::Slice(m) => m.messages,
1846 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1847 tl::enums::messages::Messages::NotModified(_) => vec![],
1848 };
1849 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1850 }
1851
1852 pub async fn delete_scheduled_messages(
1854 &self,
1855 peer: tl::enums::Peer,
1856 ids: Vec<i32>,
1857 ) -> Result<(), InvocationError> {
1858 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1859 let req = tl::functions::messages::DeleteScheduledMessages {
1860 peer: input_peer,
1861 id: ids,
1862 };
1863 self.rpc_write(&req).await
1864 }
1865
1866 pub async fn answer_callback_query(
1869 &self,
1870 query_id: i64,
1871 text: Option<&str>,
1872 alert: bool,
1873 ) -> Result<bool, InvocationError> {
1874 let req = tl::functions::messages::SetBotCallbackAnswer {
1875 alert,
1876 query_id,
1877 message: text.map(|s| s.to_string()),
1878 url: None,
1879 cache_time: 0,
1880 };
1881 let body = self.rpc_call_raw(&req).await?;
1882 Ok(!body.is_empty())
1883 }
1884
1885 pub async fn answer_inline_query(
1886 &self,
1887 query_id: i64,
1888 results: Vec<tl::enums::InputBotInlineResult>,
1889 cache_time: i32,
1890 is_personal: bool,
1891 next_offset: Option<String>,
1892 ) -> Result<bool, InvocationError> {
1893 let req = tl::functions::messages::SetInlineBotResults {
1894 gallery: false,
1895 private: is_personal,
1896 query_id,
1897 results,
1898 cache_time,
1899 next_offset,
1900 switch_pm: None,
1901 switch_webview: None,
1902 };
1903 let body = self.rpc_call_raw(&req).await?;
1904 Ok(!body.is_empty())
1905 }
1906
1907 pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
1911 let req = tl::functions::messages::GetDialogs {
1912 exclude_pinned: false,
1913 folder_id: None,
1914 offset_date: 0,
1915 offset_id: 0,
1916 offset_peer: tl::enums::InputPeer::Empty,
1917 limit,
1918 hash: 0,
1919 };
1920
1921 let body = self.rpc_call_raw(&req).await?;
1922 let mut cur = Cursor::from_slice(&body);
1923 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
1924 tl::enums::messages::Dialogs::Dialogs(d) => d,
1925 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
1926 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
1927 },
1928 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
1929 };
1930
1931 let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
1933 .filter_map(|m| {
1934 let id = match &m {
1935 tl::enums::Message::Message(x) => x.id,
1936 tl::enums::Message::Service(x) => x.id,
1937 tl::enums::Message::Empty(x) => x.id,
1938 };
1939 Some((id, m))
1940 })
1941 .collect();
1942
1943 let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
1945 .filter_map(|u| {
1946 if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
1947 })
1948 .collect();
1949
1950 let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
1952 .filter_map(|c| {
1953 let id = match &c {
1954 tl::enums::Chat::Chat(x) => x.id,
1955 tl::enums::Chat::Forbidden(x) => x.id,
1956 tl::enums::Chat::Channel(x) => x.id,
1957 tl::enums::Chat::ChannelForbidden(x) => x.id,
1958 tl::enums::Chat::Empty(x) => x.id,
1959 };
1960 Some((id, c))
1961 })
1962 .collect();
1963
1964 {
1966 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
1967 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
1968 self.cache_users_slice(&u_list).await;
1969 self.cache_chats_slice(&c_list).await;
1970 }
1971
1972 let result = raw.dialogs.into_iter().map(|d| {
1973 let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
1974 let peer = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
1975
1976 let message = msg_map.get(&top_id).cloned();
1977 let entity = peer.and_then(|p| match p {
1978 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
1979 _ => None,
1980 });
1981 let chat = peer.and_then(|p| match p {
1982 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
1983 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
1984 _ => None,
1985 });
1986
1987 Dialog { raw: d, message, entity, chat }
1988 }).collect();
1989
1990 Ok(result)
1991 }
1992
1993 #[allow(dead_code)]
1995 async fn get_dialogs_raw(
1996 &self,
1997 req: tl::functions::messages::GetDialogs,
1998 ) -> Result<Vec<Dialog>, InvocationError> {
1999 let body = self.rpc_call_raw(&req).await?;
2000 let mut cur = Cursor::from_slice(&body);
2001 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2002 tl::enums::messages::Dialogs::Dialogs(d) => d,
2003 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
2004 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2005 },
2006 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
2007 };
2008
2009 let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2010 .filter_map(|m| {
2011 let id = match &m {
2012 tl::enums::Message::Message(x) => x.id,
2013 tl::enums::Message::Service(x) => x.id,
2014 tl::enums::Message::Empty(x) => x.id,
2015 };
2016 Some((id, m))
2017 })
2018 .collect();
2019
2020 let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2021 .filter_map(|u| {
2022 if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2023 })
2024 .collect();
2025
2026 let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2027 .filter_map(|c| {
2028 let id = match &c {
2029 tl::enums::Chat::Chat(x) => x.id,
2030 tl::enums::Chat::Forbidden(x) => x.id,
2031 tl::enums::Chat::Channel(x) => x.id,
2032 tl::enums::Chat::ChannelForbidden(x) => x.id,
2033 tl::enums::Chat::Empty(x) => x.id,
2034 };
2035 Some((id, c))
2036 })
2037 .collect();
2038
2039 {
2040 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2041 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2042 self.cache_users_slice(&u_list).await;
2043 self.cache_chats_slice(&c_list).await;
2044 }
2045
2046 let result = raw.dialogs.into_iter().map(|d| {
2047 let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2048 let peer = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2049
2050 let message = msg_map.get(&top_id).cloned();
2051 let entity = peer.and_then(|p| match p {
2052 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2053 _ => None,
2054 });
2055 let chat = peer.and_then(|p| match p {
2056 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
2057 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2058 _ => None,
2059 });
2060
2061 Dialog { raw: d, message, entity, chat }
2062 }).collect();
2063
2064 Ok(result)
2065 }
2066
2067 async fn get_dialogs_raw_with_count(
2069 &self,
2070 req: tl::functions::messages::GetDialogs,
2071 ) -> Result<(Vec<Dialog>, Option<i32>), InvocationError> {
2072 let body = self.rpc_call_raw(&req).await?;
2073 let mut cur = Cursor::from_slice(&body);
2074 let (raw, count) = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
2075 tl::enums::messages::Dialogs::Dialogs(d) => (d, None),
2076 tl::enums::messages::Dialogs::Slice(d) => {
2077 let cnt = Some(d.count);
2078 (tl::types::messages::Dialogs {
2079 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
2080 }, cnt)
2081 }
2082 tl::enums::messages::Dialogs::NotModified(_) => return Ok((vec![], None)),
2083 };
2084
2085 let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
2086 .filter_map(|m| {
2087 let id = match &m {
2088 tl::enums::Message::Message(x) => x.id,
2089 tl::enums::Message::Service(x) => x.id,
2090 tl::enums::Message::Empty(x) => x.id,
2091 };
2092 Some((id, m))
2093 }).collect();
2094
2095 let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
2096 .filter_map(|u| {
2097 if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
2098 }).collect();
2099
2100 let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
2101 .filter_map(|c| {
2102 let id = match &c {
2103 tl::enums::Chat::Chat(x) => x.id,
2104 tl::enums::Chat::Forbidden(x) => x.id,
2105 tl::enums::Chat::Channel(x) => x.id,
2106 tl::enums::Chat::ChannelForbidden(x) => x.id,
2107 tl::enums::Chat::Empty(x) => x.id,
2108 };
2109 Some((id, c))
2110 }).collect();
2111
2112 {
2113 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
2114 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
2115 self.cache_users_slice(&u_list).await;
2116 self.cache_chats_slice(&c_list).await;
2117 }
2118
2119 let result = raw.dialogs.into_iter().map(|d| {
2120 let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
2121 let peer = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
2122 let message = msg_map.get(&top_id).cloned();
2123 let entity = peer.and_then(|p| match p {
2124 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
2125 _ => None,
2126 });
2127 let chat = peer.and_then(|p| match p {
2128 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
2129 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
2130 _ => None,
2131 });
2132 Dialog { raw: d, message, entity, chat }
2133 }).collect();
2134
2135 Ok((result, count))
2136 }
2137
2138 async fn get_messages_with_count(
2140 &self,
2141 peer: tl::enums::InputPeer,
2142 limit: i32,
2143 offset_id: i32,
2144 ) -> Result<(Vec<update::IncomingMessage>, Option<i32>), InvocationError> {
2145 let req = tl::functions::messages::GetHistory {
2146 peer, offset_id, offset_date: 0, add_offset: 0,
2147 limit, max_id: 0, min_id: 0, hash: 0,
2148 };
2149 let body = self.rpc_call_raw(&req).await?;
2150 let mut cur = Cursor::from_slice(&body);
2151 let (msgs, count) = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2152 tl::enums::messages::Messages::Messages(m) => (m.messages, None),
2153 tl::enums::messages::Messages::Slice(m) => {
2154 let cnt = Some(m.count);
2155 (m.messages, cnt)
2156 }
2157 tl::enums::messages::Messages::ChannelMessages(m) => (m.messages, Some(m.count)),
2158 tl::enums::messages::Messages::NotModified(_) => (vec![], None),
2159 };
2160 Ok((msgs.into_iter().map(update::IncomingMessage::from_raw).collect(), count))
2161 }
2162
2163 pub async fn download_media_to_file(
2174 &self,
2175 location: tl::enums::InputFileLocation,
2176 path: impl AsRef<std::path::Path>,
2177 ) -> Result<(), InvocationError> {
2178 use tokio::io::AsyncWriteExt as _;
2179 let bytes = self.download_media(location).await?;
2180 let mut f = tokio::fs::File::create(path).await?;
2181 f.write_all(&bytes).await?;
2182 Ok(())
2183 }
2184
2185 pub async fn delete_dialog(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2186 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2187 let req = tl::functions::messages::DeleteHistory {
2188 just_clear: false,
2189 revoke: false,
2190 peer: input_peer,
2191 max_id: 0,
2192 min_date: None,
2193 max_date: None,
2194 };
2195 self.rpc_write(&req).await
2196 }
2197
2198 pub async fn mark_as_read(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2200 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2201 match &input_peer {
2202 tl::enums::InputPeer::Channel(c) => {
2203 let req = tl::functions::channels::ReadHistory {
2204 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2205 channel_id: c.channel_id, access_hash: c.access_hash,
2206 }),
2207 max_id: 0,
2208 };
2209 self.rpc_call_raw(&req).await?;
2210 }
2211 _ => {
2212 let req = tl::functions::messages::ReadHistory { peer: input_peer, max_id: 0 };
2213 self.rpc_call_raw(&req).await?;
2214 }
2215 }
2216 Ok(())
2217 }
2218
2219 pub async fn clear_mentions(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2221 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2222 let req = tl::functions::messages::ReadMentions { peer: input_peer, top_msg_id: None };
2223 self.rpc_write(&req).await
2224 }
2225
2226 pub async fn send_chat_action(
2232 &self,
2233 peer: tl::enums::Peer,
2234 action: tl::enums::SendMessageAction,
2235 ) -> Result<(), InvocationError> {
2236 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2237 let req = tl::functions::messages::SetTyping {
2238 peer: input_peer,
2239 top_msg_id: None,
2240 action,
2241 };
2242 self.rpc_write(&req).await
2243 }
2244
2245 pub async fn join_chat(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
2249 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
2250 match input_peer {
2251 tl::enums::InputPeer::Channel(c) => {
2252 let req = tl::functions::channels::JoinChannel {
2253 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
2254 channel_id: c.channel_id, access_hash: c.access_hash,
2255 }),
2256 };
2257 self.rpc_call_raw(&req).await?;
2258 }
2259 tl::enums::InputPeer::Chat(c) => {
2260 let req = tl::functions::messages::AddChatUser {
2261 chat_id: c.chat_id,
2262 user_id: tl::enums::InputUser::UserSelf,
2263 fwd_limit: 0,
2264 };
2265 self.rpc_call_raw(&req).await?;
2266 }
2267 _ => return Err(InvocationError::Deserialize("cannot join this peer type".into())),
2268 }
2269 Ok(())
2270 }
2271
2272 pub async fn accept_invite_link(&self, link: &str) -> Result<(), InvocationError> {
2274 let hash = Self::parse_invite_hash(link)
2275 .ok_or_else(|| InvocationError::Deserialize(format!("invalid invite link: {link}")))?;
2276 let req = tl::functions::messages::ImportChatInvite { hash: hash.to_string() };
2277 self.rpc_write(&req).await
2278 }
2279
2280 pub fn parse_invite_hash(link: &str) -> Option<&str> {
2282 if let Some(pos) = link.find("/+") {
2283 return Some(&link[pos + 2..]);
2284 }
2285 if let Some(pos) = link.find("/joinchat/") {
2286 return Some(&link[pos + 10..]);
2287 }
2288 None
2289 }
2290
2291 pub async fn get_messages(
2295 &self,
2296 peer: tl::enums::InputPeer,
2297 limit: i32,
2298 offset_id: i32,
2299 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
2300 let req = tl::functions::messages::GetHistory {
2301 peer, offset_id, offset_date: 0, add_offset: 0,
2302 limit, max_id: 0, min_id: 0, hash: 0,
2303 };
2304 let body = self.rpc_call_raw(&req).await?;
2305 let mut cur = Cursor::from_slice(&body);
2306 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
2307 tl::enums::messages::Messages::Messages(m) => m.messages,
2308 tl::enums::messages::Messages::Slice(m) => m.messages,
2309 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
2310 tl::enums::messages::Messages::NotModified(_) => vec![],
2311 };
2312 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
2313 }
2314
2315 pub async fn resolve_peer(
2319 &self,
2320 peer: &str,
2321 ) -> Result<tl::enums::Peer, InvocationError> {
2322 match peer.trim() {
2323 "me" | "self" => Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 })),
2324 username if username.starts_with('@') => {
2325 self.resolve_username(&username[1..]).await
2326 }
2327 id_str => {
2328 if let Ok(id) = id_str.parse::<i64>() {
2329 Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: id }))
2330 } else {
2331 Err(InvocationError::Deserialize(format!("cannot resolve peer: {peer}")))
2332 }
2333 }
2334 }
2335 }
2336
2337 async fn resolve_username(&self, username: &str) -> Result<tl::enums::Peer, InvocationError> {
2338 let req = tl::functions::contacts::ResolveUsername {
2339 username: username.to_string(), referer: None,
2340 };
2341 let body = self.rpc_call_raw(&req).await?;
2342 let mut cur = Cursor::from_slice(&body);
2343 let resolved = match tl::enums::contacts::ResolvedPeer::deserialize(&mut cur)? {
2344 tl::enums::contacts::ResolvedPeer::ResolvedPeer(r) => r,
2345 };
2346 self.cache_users_slice(&resolved.users).await;
2348 self.cache_chats_slice(&resolved.chats).await;
2349 Ok(resolved.peer)
2350 }
2351
2352 pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
2356 let body = self.rpc_call_raw(req).await?;
2357 let mut cur = Cursor::from_slice(&body);
2358 R::Return::deserialize(&mut cur).map_err(Into::into)
2359 }
2360
2361 async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
2362 let mut fail_count = NonZeroU32::new(1).unwrap();
2363 let mut slept_so_far = Duration::default();
2364 loop {
2365 match self.do_rpc_call(req).await {
2366 Ok(body) => return Ok(body),
2367 Err(e) => {
2368 let ctx = RetryContext { fail_count, slept_so_far, error: e };
2369 match self.inner.retry_policy.should_retry(&ctx) {
2370 ControlFlow::Continue(delay) => {
2371 sleep(delay).await;
2372 slept_so_far += delay;
2373 fail_count = fail_count.saturating_add(1);
2374 }
2375 ControlFlow::Break(()) => return Err(ctx.error),
2376 }
2377 }
2378 }
2379 }
2380 }
2381
2382 async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
2392 let (tx, rx) = oneshot::channel();
2393 {
2394 let mut w = self.inner.writer.lock().await;
2395 let (wire, msg_id) = w.enc.pack_with_msg_id(req);
2396 let fk = w.frame_kind.clone();
2397 self.inner.pending.lock().await.insert(msg_id, tx);
2400 send_frame_write(&mut w.write_half, &wire, &fk).await?;
2401 }
2402 match tokio::time::timeout(Duration::from_secs(30), rx).await {
2405 Ok(Ok(result)) => result,
2406 Ok(Err(_)) => Err(InvocationError::Deserialize("RPC channel closed (reader died?)".into())),
2407 Err(_) => Err(InvocationError::Deserialize("RPC timed out after 30 s".into())),
2408 }
2409 }
2410
2411 async fn rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
2414 let mut fail_count = NonZeroU32::new(1).unwrap();
2415 let mut slept_so_far = Duration::default();
2416 loop {
2417 let result = self.do_rpc_write(req).await;
2418 match result {
2419 Ok(()) => return Ok(()),
2420 Err(e) => {
2421 let ctx = RetryContext { fail_count, slept_so_far, error: e };
2422 match self.inner.retry_policy.should_retry(&ctx) {
2423 ControlFlow::Continue(delay) => {
2424 sleep(delay).await;
2425 slept_so_far += delay;
2426 fail_count = fail_count.saturating_add(1);
2427 }
2428 ControlFlow::Break(()) => return Err(ctx.error),
2429 }
2430 }
2431 }
2432 }
2433 }
2434
2435 async fn do_rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
2436 let (tx, rx) = oneshot::channel();
2437 {
2438 let mut w = self.inner.writer.lock().await;
2439 let (wire, msg_id) = w.enc.pack_serializable_with_msg_id(req);
2440 let fk = w.frame_kind.clone();
2441 self.inner.pending.lock().await.insert(msg_id, tx);
2442 send_frame_write(&mut w.write_half, &wire, &fk).await?;
2443 }
2444 match tokio::time::timeout(Duration::from_secs(30), rx).await {
2445 Ok(Ok(result)) => result.map(|_| ()), Ok(Err(_)) => Err(InvocationError::Deserialize("rpc_write channel closed".into())),
2447 Err(_) => Err(InvocationError::Deserialize("rpc_write timed out after 30 s".into())),
2448 }
2449 }
2450
2451 async fn init_connection(&self) -> Result<(), InvocationError> {
2454 use tl::functions::{InvokeWithLayer, InitConnection, help::GetConfig};
2455 let req = InvokeWithLayer {
2456 layer: tl::LAYER,
2457 query: InitConnection {
2458 api_id: self.inner.api_id,
2459 device_model: "Linux".to_string(),
2460 system_version: "1.0".to_string(),
2461 app_version: env!("CARGO_PKG_VERSION").to_string(),
2462 system_lang_code: "en".to_string(),
2463 lang_pack: "".to_string(),
2464 lang_code: "en".to_string(),
2465 proxy: None,
2466 params: None,
2467 query: GetConfig {},
2468 },
2469 };
2470
2471 let body = self.rpc_call_raw_serializable(&req).await?;
2473
2474 let mut cur = Cursor::from_slice(&body);
2475 if let Ok(tl::enums::Config::Config(cfg)) = tl::enums::Config::deserialize(&mut cur) {
2476 let allow_ipv6 = self.inner.allow_ipv6;
2477 let mut opts = self.inner.dc_options.lock().await;
2478 for opt in &cfg.dc_options {
2479 let tl::enums::DcOption::DcOption(o) = opt;
2480 if o.media_only || o.cdn || o.tcpo_only { continue; }
2481 if o.ipv6 && !allow_ipv6 { continue; }
2482 let addr = format!("{}:{}", o.ip_address, o.port);
2483 let entry = opts.entry(o.id).or_insert_with(|| DcEntry {
2484 dc_id: o.id, addr: addr.clone(),
2485 auth_key: None, first_salt: 0, time_offset: 0,
2486 });
2487 entry.addr = addr;
2488 }
2489 log::info!("[layer] initConnection ✓ ({} DCs, ipv6={})", cfg.dc_options.len(), allow_ipv6);
2490 }
2491 Ok(())
2492 }
2493
2494 async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
2497 let addr = {
2498 let opts = self.inner.dc_options.lock().await;
2499 opts.get(&new_dc_id).map(|e| e.addr.clone())
2500 .unwrap_or_else(|| "149.154.167.51:443".to_string())
2501 };
2502 log::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
2503
2504 let saved_key = {
2505 let opts = self.inner.dc_options.lock().await;
2506 opts.get(&new_dc_id).and_then(|e| e.auth_key)
2507 };
2508
2509 let socks5 = self.inner.socks5.clone();
2510 let transport = self.inner.transport.clone();
2511 let conn = if let Some(key) = saved_key {
2512 Connection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
2513 } else {
2514 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
2515 };
2516
2517 let new_key = conn.auth_key_bytes();
2518 {
2519 let mut opts = self.inner.dc_options.lock().await;
2520 let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
2521 dc_id: new_dc_id, addr: addr.clone(),
2522 auth_key: None, first_salt: 0, time_offset: 0,
2523 });
2524 entry.auth_key = Some(new_key);
2525 }
2526
2527 let (new_writer, new_read, new_fk) = conn.into_writer();
2529 let new_ak = new_writer.enc.auth_key_bytes();
2530 let new_sid = new_writer.enc.session_id();
2531 *self.inner.writer.lock().await = new_writer;
2532 *self.inner.home_dc_id.lock().await = new_dc_id;
2533
2534 let _ = self.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
2537
2538 loop {
2548 match self.init_connection().await {
2549 Ok(()) => break,
2550 Err(InvocationError::Rpc(ref r))
2551 if r.flood_wait_seconds().is_some() =>
2552 {
2553 let secs = r.flood_wait_seconds().unwrap();
2554 log::warn!(
2555 "[layer] migrate_to DC{new_dc_id}: init FLOOD_WAIT_{secs} — waiting"
2556 );
2557 sleep(Duration::from_secs(secs + 1)).await;
2558 }
2559 Err(e) => return Err(e),
2560 }
2561 }
2562
2563 self.save_session().await.ok();
2564 log::info!("[layer] Now on DC{new_dc_id} ✓");
2565 Ok(())
2566 }
2567
2568 async fn cache_user(&self, user: &tl::enums::User) {
2571 self.inner.peer_cache.lock().await.cache_user(user);
2572 }
2573
2574 async fn cache_users_slice(&self, users: &[tl::enums::User]) {
2575 let mut cache = self.inner.peer_cache.lock().await;
2576 cache.cache_users(users);
2577 }
2578
2579 async fn cache_chats_slice(&self, chats: &[tl::enums::Chat]) {
2580 let mut cache = self.inner.peer_cache.lock().await;
2581 cache.cache_chats(chats);
2582 }
2583
2584 #[doc(hidden)]
2586 pub async fn cache_users_slice_pub(&self, users: &[tl::enums::User]) {
2587 self.cache_users_slice(users).await;
2588 }
2589
2590 #[doc(hidden)]
2591 pub async fn cache_chats_slice_pub(&self, chats: &[tl::enums::Chat]) {
2592 self.cache_chats_slice(chats).await;
2593 }
2594
2595 #[doc(hidden)]
2597 pub async fn rpc_call_raw_pub<R: layer_tl_types::RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
2598 self.rpc_call_raw(req).await
2599 }
2600
2601 async fn rpc_call_raw_serializable<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
2603 let mut fail_count = NonZeroU32::new(1).unwrap();
2604 let mut slept_so_far = Duration::default();
2605 loop {
2606 match self.do_rpc_write_returning_body(req).await {
2607 Ok(body) => return Ok(body),
2608 Err(e) => {
2609 let ctx = RetryContext { fail_count, slept_so_far, error: e };
2610 match self.inner.retry_policy.should_retry(&ctx) {
2611 ControlFlow::Continue(delay) => {
2612 sleep(delay).await;
2613 slept_so_far += delay;
2614 fail_count = fail_count.saturating_add(1);
2615 }
2616 ControlFlow::Break(()) => return Err(ctx.error),
2617 }
2618 }
2619 }
2620 }
2621 }
2622
2623 async fn do_rpc_write_returning_body<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
2624 let (tx, rx) = oneshot::channel();
2625 {
2626 let mut w = self.inner.writer.lock().await;
2627 let (wire, msg_id) = w.enc.pack_serializable_with_msg_id(req);
2628 let fk = w.frame_kind.clone();
2629 self.inner.pending.lock().await.insert(msg_id, tx);
2630 send_frame_write(&mut w.write_half, &wire, &fk).await?;
2631 }
2632 match tokio::time::timeout(Duration::from_secs(30), rx).await {
2633 Ok(Ok(result)) => result,
2634 Ok(Err(_)) => Err(InvocationError::Deserialize("rpc channel closed".into())),
2635 Err(_) => Err(InvocationError::Deserialize("rpc timed out after 30 s".into())),
2636 }
2637 }
2638
2639 pub fn iter_dialogs(&self) -> DialogIter {
2656 DialogIter {
2657 offset_date: 0,
2658 offset_id: 0,
2659 offset_peer: tl::enums::InputPeer::Empty,
2660 done: false,
2661 buffer: VecDeque::new(),
2662 total: None,
2663 }
2664 }
2665
2666 pub fn iter_messages(&self, peer: tl::enums::Peer) -> MessageIter {
2680 MessageIter {
2681 peer,
2682 offset_id: 0,
2683 done: false,
2684 buffer: VecDeque::new(),
2685 total: None,
2686 }
2687 }
2688
2689 pub async fn resolve_to_input_peer(
2694 &self,
2695 peer: &tl::enums::Peer,
2696 ) -> Result<tl::enums::InputPeer, InvocationError> {
2697 let cache = self.inner.peer_cache.lock().await;
2698 match peer {
2699 tl::enums::Peer::User(u) => {
2700 if u.user_id == 0 {
2701 return Ok(tl::enums::InputPeer::PeerSelf);
2702 }
2703 match cache.users.get(&u.user_id) {
2704 Some(&hash) => Ok(tl::enums::InputPeer::User(tl::types::InputPeerUser {
2705 user_id: u.user_id, access_hash: hash,
2706 })),
2707 None => Err(InvocationError::Deserialize(format!(
2708 "access_hash unknown for user {}; resolve via username first", u.user_id
2709 ))),
2710 }
2711 }
2712 tl::enums::Peer::Chat(c) => {
2713 Ok(tl::enums::InputPeer::Chat(tl::types::InputPeerChat { chat_id: c.chat_id }))
2714 }
2715 tl::enums::Peer::Channel(c) => {
2716 match cache.channels.get(&c.channel_id) {
2717 Some(&hash) => Ok(tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
2718 channel_id: c.channel_id, access_hash: hash,
2719 })),
2720 None => Err(InvocationError::Deserialize(format!(
2721 "access_hash unknown for channel {}; resolve via username first", c.channel_id
2722 ))),
2723 }
2724 }
2725 }
2726 }
2727
2728 pub async fn invoke_on_dc<R: RemoteCall>(
2736 &self,
2737 dc_id: i32,
2738 req: &R,
2739 ) -> Result<R::Return, InvocationError> {
2740 let body = self.rpc_on_dc_raw(dc_id, req).await?;
2741 let mut cur = Cursor::from_slice(&body);
2742 R::Return::deserialize(&mut cur).map_err(Into::into)
2743 }
2744
2745 async fn rpc_on_dc_raw<R: RemoteCall>(
2747 &self,
2748 dc_id: i32,
2749 req: &R,
2750 ) -> Result<Vec<u8>, InvocationError> {
2751 let needs_new = {
2753 let pool = self.inner.dc_pool.lock().await;
2754 !pool.has_connection(dc_id)
2755 };
2756
2757 if needs_new {
2758 let addr = {
2759 let opts = self.inner.dc_options.lock().await;
2760 opts.get(&dc_id).map(|e| e.addr.clone())
2761 .ok_or_else(|| InvocationError::Deserialize(format!("unknown DC{dc_id}")))?
2762 };
2763
2764 let socks5 = self.inner.socks5.clone();
2765 let transport = self.inner.transport.clone();
2766 let saved_key = {
2767 let opts = self.inner.dc_options.lock().await;
2768 opts.get(&dc_id).and_then(|e| e.auth_key)
2769 };
2770
2771 let dc_conn = if let Some(key) = saved_key {
2772 dc_pool::DcConnection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
2773 } else {
2774 let conn = dc_pool::DcConnection::connect_raw(&addr, socks5.as_ref(), &transport).await?;
2775 let home_dc_id = *self.inner.home_dc_id.lock().await;
2777 if dc_id != home_dc_id {
2778 if let Err(e) = self.export_import_auth(dc_id, &conn).await {
2779 log::warn!("[layer] Auth export/import for DC{dc_id} failed: {e}");
2780 }
2781 }
2782 conn
2783 };
2784
2785 let key = dc_conn.auth_key_bytes();
2786 {
2787 let mut opts = self.inner.dc_options.lock().await;
2788 if let Some(e) = opts.get_mut(&dc_id) {
2789 e.auth_key = Some(key);
2790 }
2791 }
2792 self.inner.dc_pool.lock().await.insert(dc_id, dc_conn);
2793 }
2794
2795 let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
2796 self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, req).await
2797 }
2798
2799 async fn export_import_auth(
2801 &self,
2802 dc_id: i32,
2803 _dc_conn: &dc_pool::DcConnection, ) -> Result<(), InvocationError> {
2805 let export_req = tl::functions::auth::ExportAuthorization { dc_id };
2807 let body = self.rpc_call_raw(&export_req).await?;
2808 let mut cur = Cursor::from_slice(&body);
2809 let exported = match tl::enums::auth::ExportedAuthorization::deserialize(&mut cur)? {
2810 tl::enums::auth::ExportedAuthorization::ExportedAuthorization(e) => e,
2811 };
2812
2813 let import_req = tl::functions::auth::ImportAuthorization {
2815 id: exported.id,
2816 bytes: exported.bytes,
2817 };
2818 let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
2819 self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, &import_req).await?;
2820 log::info!("[layer] Auth exported+imported to DC{dc_id} ✓");
2821 Ok(())
2822 }
2823
2824 async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
2827 let body = self.rpc_call_raw(&tl::functions::account::GetPassword {}).await?;
2828 let mut cur = Cursor::from_slice(&body);
2829 let pw = match tl::enums::account::Password::deserialize(&mut cur)? {
2830 tl::enums::account::Password::Password(p) => p,
2831 };
2832 Ok(PasswordToken { password: pw })
2833 }
2834
2835 fn make_send_code_req(&self, phone: &str) -> tl::functions::auth::SendCode {
2836 tl::functions::auth::SendCode {
2837 phone_number: phone.to_string(),
2838 api_id: self.inner.api_id,
2839 api_hash: self.inner.api_hash.clone(),
2840 settings: tl::enums::CodeSettings::CodeSettings(
2841 tl::types::CodeSettings {
2842 allow_flashcall: false, current_number: false, allow_app_hash: false,
2843 allow_missed_call: false, allow_firebase: false, unknown_number: false,
2844 logout_tokens: None, token: None, app_sandbox: None,
2845 },
2846 ),
2847 }
2848 }
2849
2850 fn extract_user_name(user: &tl::enums::User) -> String {
2851 match user {
2852 tl::enums::User::User(u) => {
2853 format!("{} {}",
2854 u.first_name.as_deref().unwrap_or(""),
2855 u.last_name.as_deref().unwrap_or(""))
2856 .trim().to_string()
2857 }
2858 tl::enums::User::Empty(_) => "(unknown)".into(),
2859 }
2860 }
2861
2862 fn extract_password_params(
2863 algo: &tl::enums::PasswordKdfAlgo,
2864 ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
2865 match algo {
2866 tl::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
2867 Ok((&a.salt1, &a.salt2, &a.p, a.g))
2868 }
2869 _ => Err(InvocationError::Deserialize("unsupported password KDF algo".into())),
2870 }
2871 }
2872}
2873
2874pub struct DialogIter {
2878 offset_date: i32,
2879 offset_id: i32,
2880 offset_peer: tl::enums::InputPeer,
2881 done: bool,
2882 buffer: VecDeque<Dialog>,
2883 pub total: Option<i32>,
2886}
2887
2888impl DialogIter {
2889 const PAGE_SIZE: i32 = 100;
2890
2891 pub fn total(&self) -> Option<i32> { self.total }
2897
2898 pub async fn next(&mut self, client: &Client) -> Result<Option<Dialog>, InvocationError> {
2900 if let Some(d) = self.buffer.pop_front() { return Ok(Some(d)); }
2901 if self.done { return Ok(None); }
2902
2903 let req = tl::functions::messages::GetDialogs {
2904 exclude_pinned: false,
2905 folder_id: None,
2906 offset_date: self.offset_date,
2907 offset_id: self.offset_id,
2908 offset_peer: self.offset_peer.clone(),
2909 limit: Self::PAGE_SIZE,
2910 hash: 0,
2911 };
2912
2913 let (dialogs, count) = client.get_dialogs_raw_with_count(req).await?;
2914 if self.total.is_none() {
2916 self.total = count;
2917 }
2918 if dialogs.is_empty() || dialogs.len() < Self::PAGE_SIZE as usize {
2919 self.done = true;
2920 }
2921
2922 if let Some(last) = dialogs.last() {
2924 self.offset_date = last.message.as_ref().map(|m| match m {
2925 tl::enums::Message::Message(x) => x.date,
2926 tl::enums::Message::Service(x) => x.date,
2927 _ => 0,
2928 }).unwrap_or(0);
2929 self.offset_id = last.top_message();
2930 if let Some(peer) = last.peer() {
2931 self.offset_peer = client.inner.peer_cache.lock().await.peer_to_input(peer);
2932 }
2933 }
2934
2935 self.buffer.extend(dialogs);
2936 Ok(self.buffer.pop_front())
2937 }
2938}
2939
2940pub struct MessageIter {
2942 peer: tl::enums::Peer,
2943 offset_id: i32,
2944 done: bool,
2945 buffer: VecDeque<update::IncomingMessage>,
2946 pub total: Option<i32>,
2950}
2951
2952impl MessageIter {
2953 const PAGE_SIZE: i32 = 100;
2954
2955 pub fn total(&self) -> Option<i32> { self.total }
2960
2961 pub async fn next(&mut self, client: &Client) -> Result<Option<update::IncomingMessage>, InvocationError> {
2963 if let Some(m) = self.buffer.pop_front() { return Ok(Some(m)); }
2964 if self.done { return Ok(None); }
2965
2966 let input_peer = client.inner.peer_cache.lock().await.peer_to_input(&self.peer);
2967 let (page, count) = client.get_messages_with_count(input_peer, Self::PAGE_SIZE, self.offset_id).await?;
2968
2969 if self.total.is_none() { self.total = count; }
2970
2971 if page.is_empty() || page.len() < Self::PAGE_SIZE as usize {
2972 self.done = true;
2973 }
2974 if let Some(last) = page.last() {
2975 self.offset_id = last.id();
2976 }
2977
2978 self.buffer.extend(page);
2979 Ok(self.buffer.pop_front())
2980 }
2981}
2982
2983#[doc(hidden)]
2987pub fn random_i64_pub() -> i64 { random_i64() }
2988
2989#[derive(Clone)]
2993enum FrameKind {
2994 Abridged,
2995 Intermediate,
2996 #[allow(dead_code)]
2997 Full { send_seqno: u32, recv_seqno: u32 },
2998}
2999
3000
3001struct ConnectionWriter {
3006 write_half: OwnedWriteHalf,
3007 enc: EncryptedSession,
3008 frame_kind: FrameKind,
3009}
3010
3011impl ConnectionWriter {
3012 fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
3013 fn first_salt(&self) -> i64 { self.enc.salt }
3014 fn time_offset(&self) -> i32 { self.enc.time_offset }
3015}
3016
3017struct Connection {
3018 stream: TcpStream,
3019 enc: EncryptedSession,
3020 frame_kind: FrameKind,
3021}
3022
3023impl Connection {
3024 async fn open_stream(
3026 addr: &str,
3027 socks5: Option<&crate::socks5::Socks5Config>,
3028 transport: &TransportKind,
3029 ) -> Result<(TcpStream, FrameKind), InvocationError> {
3030 let stream = match socks5 {
3031 Some(proxy) => proxy.connect(addr).await?,
3032 None => {
3033 let stream = TcpStream::connect(addr).await
3036 .map_err(InvocationError::Io)?;
3037
3038 {
3044 let sock = socket2::SockRef::from(&stream);
3045 let keepalive = TcpKeepalive::new()
3046 .with_time(Duration::from_secs(TCP_KEEPALIVE_IDLE_SECS))
3047 .with_interval(Duration::from_secs(TCP_KEEPALIVE_INTERVAL_SECS));
3048 #[cfg(not(target_os = "windows"))]
3049 let keepalive = keepalive.with_retries(TCP_KEEPALIVE_PROBES);
3050 sock.set_tcp_keepalive(&keepalive).ok();
3051 }
3052 stream
3053 }
3054 };
3055 Self::apply_transport_init(stream, transport).await
3056 }
3057
3058 async fn apply_transport_init(
3060 mut stream: TcpStream,
3061 transport: &TransportKind,
3062 ) -> Result<(TcpStream, FrameKind), InvocationError> {
3063 match transport {
3064 TransportKind::Abridged => {
3065 stream.write_all(&[0xef]).await?;
3066 Ok((stream, FrameKind::Abridged))
3067 }
3068 TransportKind::Intermediate => {
3069 stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
3070 Ok((stream, FrameKind::Intermediate))
3071 }
3072 TransportKind::Full => {
3073 Ok((stream, FrameKind::Full { send_seqno: 0, recv_seqno: 0 }))
3075 }
3076 TransportKind::Obfuscated { secret } => {
3077 let mut nonce = [0u8; 64];
3086 getrandom::getrandom(&mut nonce).map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
3087 let (enc_key, enc_iv, _dec_key, _dec_iv) = crate::transport_obfuscated::derive_keys(&nonce, secret.as_ref());
3089 let mut enc_cipher = crate::transport_obfuscated::ObfCipher::new(enc_key, enc_iv);
3090 let mut handshake = nonce;
3092 handshake[56] = 0xef; handshake[57] = 0xef;
3093 handshake[58] = 0xef; handshake[59] = 0xef;
3094 enc_cipher.apply(&mut handshake[56..]);
3095 stream.write_all(&handshake).await?;
3096 Ok((stream, FrameKind::Abridged))
3097 }
3098 }
3099 }
3100
3101 async fn connect_raw(
3102 addr: &str,
3103 socks5: Option<&crate::socks5::Socks5Config>,
3104 transport: &TransportKind,
3105 ) -> Result<Self, InvocationError> {
3106 log::info!("[layer] Connecting to {addr} (DH) …");
3107
3108 let addr2 = addr.to_string();
3112 let socks5_c = socks5.cloned();
3113 let transport_c = transport.clone();
3114
3115 let fut = async move {
3116 let (mut stream, frame_kind) =
3117 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
3118
3119 let mut plain = Session::new();
3120
3121 let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3122 send_frame(&mut stream, &plain.pack(&req1).to_plaintext_bytes(), &frame_kind).await?;
3123 let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
3124
3125 let (req2, s2) = auth::step2(s1, res_pq).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3126 send_frame(&mut stream, &plain.pack(&req2).to_plaintext_bytes(), &frame_kind).await?;
3127 let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
3128
3129 let (req3, s3) = auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3130 send_frame(&mut stream, &plain.pack(&req3).to_plaintext_bytes(), &frame_kind).await?;
3131 let ans: tl::enums::SetClientDhParamsAnswer = recv_frame_plain(&mut stream, &frame_kind).await?;
3132
3133 let done = auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
3134 log::info!("[layer] DH complete ✓");
3135
3136 Ok::<Self, InvocationError>(Self {
3137 stream,
3138 enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
3139 frame_kind,
3140 })
3141 };
3142
3143 tokio::time::timeout(Duration::from_secs(15), fut)
3144 .await
3145 .map_err(|_| InvocationError::Deserialize(
3146 format!("DH handshake with {addr} timed out after 15 s")
3147 ))?
3148 }
3149
3150 async fn connect_with_key(
3151 addr: &str,
3152 auth_key: [u8; 256],
3153 first_salt: i64,
3154 time_offset: i32,
3155 socks5: Option<&crate::socks5::Socks5Config>,
3156 transport: &TransportKind,
3157 ) -> Result<Self, InvocationError> {
3158 let addr2 = addr.to_string();
3159 let socks5_c = socks5.cloned();
3160 let transport_c = transport.clone();
3161
3162 let fut = async move {
3163 let (stream, frame_kind) =
3164 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
3165 Ok::<Self, InvocationError>(Self {
3166 stream,
3167 enc: EncryptedSession::new(auth_key, first_salt, time_offset),
3168 frame_kind,
3169 })
3170 };
3171
3172 tokio::time::timeout(Duration::from_secs(15), fut)
3173 .await
3174 .map_err(|_| InvocationError::Deserialize(
3175 format!("connect_with_key to {addr} timed out after 15 s")
3176 ))?
3177 }
3178
3179 fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
3180
3181 fn into_writer(self) -> (ConnectionWriter, OwnedReadHalf, FrameKind) {
3183 let (read_half, write_half) = self.stream.into_split();
3184 let writer = ConnectionWriter {
3185 write_half,
3186 enc: self.enc,
3187 frame_kind: self.frame_kind.clone(),
3188 };
3189 (writer, read_half, self.frame_kind)
3190 }
3191}
3192
3193async fn send_frame(
3197 stream: &mut TcpStream,
3198 data: &[u8],
3199 kind: &FrameKind,
3200) -> Result<(), InvocationError> {
3201 match kind {
3202 FrameKind::Abridged => send_abridged(stream, data).await,
3203 FrameKind::Intermediate => {
3204 stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
3205 stream.write_all(data).await?;
3206 Ok(())
3207 }
3208 FrameKind::Full { .. } => {
3209 stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
3215 stream.write_all(data).await?;
3216 Ok(())
3217 }
3218 }
3219}
3220
3221enum FrameOutcome {
3225 Frame(Vec<u8>),
3226 Error(InvocationError),
3227 Keepalive, }
3229
3230async fn recv_frame_with_keepalive(
3237 rh: &mut OwnedReadHalf,
3238 fk: &FrameKind,
3239 client: &Client,
3240 _ak: &[u8; 256],
3241) -> FrameOutcome {
3242 match tokio::time::timeout(Duration::from_secs(PING_DELAY_SECS), recv_frame_read(rh, fk)).await {
3243 Ok(Ok(raw)) => FrameOutcome::Frame(raw),
3244 Ok(Err(e)) => FrameOutcome::Error(e),
3245 Err(_) => {
3246 let ping_req = tl::functions::PingDelayDisconnect {
3250 ping_id: random_i64(),
3251 disconnect_delay: NO_PING_DISCONNECT,
3252 };
3253 let mut w = client.inner.writer.lock().await;
3254 let wire = w.enc.pack(&ping_req);
3255 let fk = w.frame_kind.clone();
3256 match send_frame_write(&mut w.write_half, &wire, &fk).await {
3257 Ok(()) => FrameOutcome::Keepalive,
3258 Err(e) => FrameOutcome::Error(e),
3262 }
3263 }
3264 }
3265}
3266
3267async fn send_frame_write(
3269 stream: &mut OwnedWriteHalf,
3270 data: &[u8],
3271 kind: &FrameKind,
3272) -> Result<(), InvocationError> {
3273 match kind {
3274 FrameKind::Abridged => {
3275 let words = data.len() / 4;
3276 if words < 0x7f {
3277 stream.write_all(&[words as u8]).await?;
3278 } else {
3279 let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
3280 stream.write_all(&b).await?;
3281 }
3282 stream.write_all(data).await?;
3283 Ok(())
3284 }
3285 FrameKind::Intermediate | FrameKind::Full { .. } => {
3286 stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
3287 stream.write_all(data).await?;
3288 Ok(())
3289 }
3290 }
3291}
3292
3293async fn recv_frame_read(
3295 stream: &mut OwnedReadHalf,
3296 kind: &FrameKind,
3297) -> Result<Vec<u8>, InvocationError> {
3298 match kind {
3299 FrameKind::Abridged => {
3300 let mut h = [0u8; 1];
3301 stream.read_exact(&mut h).await?;
3302 let words = if h[0] < 0x7f {
3303 h[0] as usize
3304 } else {
3305 let mut b = [0u8; 3];
3306 stream.read_exact(&mut b).await?;
3307 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
3308 };
3309 let len = words * 4;
3310 let mut buf = vec![0u8; len];
3311 stream.read_exact(&mut buf).await?;
3312 Ok(buf)
3313 }
3314 FrameKind::Intermediate | FrameKind::Full { .. } => {
3315 let mut len_buf = [0u8; 4];
3316 stream.read_exact(&mut len_buf).await?;
3317 let len = u32::from_le_bytes(len_buf) as usize;
3318 let mut buf = vec![0u8; len];
3319 stream.read_exact(&mut buf).await?;
3320 Ok(buf)
3321 }
3322 }
3323}
3324
3325
3326async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
3328 let words = data.len() / 4;
3329 if words < 0x7f {
3330 stream.write_all(&[words as u8]).await?;
3331 } else {
3332 let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
3333 stream.write_all(&b).await?;
3334 }
3335 stream.write_all(data).await?;
3336 Ok(())
3337}
3338
3339async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
3340 let mut h = [0u8; 1];
3341 stream.read_exact(&mut h).await?;
3342 let words = if h[0] < 0x7f {
3343 h[0] as usize
3344 } else {
3345 let mut b = [0u8; 3];
3346 stream.read_exact(&mut b).await?;
3347 let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
3348 if w == 1 {
3350 let mut code_buf = [0u8; 4];
3351 stream.read_exact(&mut code_buf).await?;
3352 let code = i32::from_le_bytes(code_buf);
3353 return Err(InvocationError::Rpc(RpcError::from_telegram(code, "transport error")));
3354 }
3355 w
3356 };
3357 if words == 0 || words > 0x8000 {
3360 return Err(InvocationError::Deserialize(
3361 format!("abridged: implausible word count {words} (possible transport error or framing mismatch)")
3362 ));
3363 }
3364 let mut buf = vec![0u8; words * 4];
3365 stream.read_exact(&mut buf).await?;
3366 Ok(buf)
3367}
3368
3369async fn recv_frame_plain<T: Deserializable>(
3371 stream: &mut TcpStream,
3372 _kind: &FrameKind,
3373) -> Result<T, InvocationError> {
3374 let raw = recv_abridged(stream).await?; if raw.len() < 20 {
3376 return Err(InvocationError::Deserialize("plaintext frame too short".into()));
3377 }
3378 if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
3379 return Err(InvocationError::Deserialize("expected auth_key_id=0 in plaintext".into()));
3380 }
3381 let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
3382 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
3383 T::deserialize(&mut cur).map_err(Into::into)
3384}
3385
3386enum EnvelopeResult {
3389 Payload(Vec<u8>),
3390 Updates(Vec<update::Update>),
3391 None,
3392}
3393
3394fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
3395 if body.len() < 4 {
3396 return Err(InvocationError::Deserialize("body < 4 bytes".into()));
3397 }
3398 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
3399
3400 match cid {
3401 ID_RPC_RESULT => {
3402 if body.len() < 12 {
3403 return Err(InvocationError::Deserialize("rpc_result too short".into()));
3404 }
3405 unwrap_envelope(body[12..].to_vec())
3406 }
3407 ID_RPC_ERROR => {
3408 if body.len() < 8 {
3409 return Err(InvocationError::Deserialize("rpc_error too short".into()));
3410 }
3411 let code = i32::from_le_bytes(body[4..8].try_into().unwrap());
3412 let message = tl_read_string(&body[8..]).unwrap_or_default();
3413 Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
3414 }
3415 ID_MSG_CONTAINER => {
3416 if body.len() < 8 {
3417 return Err(InvocationError::Deserialize("container too short".into()));
3418 }
3419 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
3420 let mut pos = 8usize;
3421 let mut payload: Option<Vec<u8>> = None;
3422 let mut updates_buf: Vec<update::Update> = Vec::new();
3423
3424 for _ in 0..count {
3425 if pos + 16 > body.len() { break; }
3426 let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
3427 pos += 16;
3428 if pos + inner_len > body.len() { break; }
3429 let inner = body[pos..pos + inner_len].to_vec();
3430 pos += inner_len;
3431 match unwrap_envelope(inner)? {
3432 EnvelopeResult::Payload(p) => { payload = Some(p); }
3433 EnvelopeResult::Updates(us) => { updates_buf.extend(us); }
3434 EnvelopeResult::None => {}
3435 }
3436 }
3437 if let Some(p) = payload {
3438 Ok(EnvelopeResult::Payload(p))
3439 } else if !updates_buf.is_empty() {
3440 Ok(EnvelopeResult::Updates(updates_buf))
3441 } else {
3442 Ok(EnvelopeResult::None)
3443 }
3444 }
3445 ID_GZIP_PACKED => {
3446 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
3447 unwrap_envelope(gz_inflate(&bytes)?)
3448 }
3449 ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY
3454 | 0xd33b5459 | 0x04deb57d | 0x8cc0d131 | 0x276d3ec6 | 0x809db6df | 0x7d861a08 | 0x0949d9dc | 0xae500895 | 0x9299359f | 0xe22045fc | 0x62d350c9 => {
3467 Ok(EnvelopeResult::None)
3468 }
3469 ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
3470 | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
3471 | ID_UPDATES_TOO_LONG => {
3472 Ok(EnvelopeResult::Updates(update::parse_updates(&body)))
3473 }
3474 _ => Ok(EnvelopeResult::Payload(body)),
3475 }
3476}
3477
3478fn random_i64() -> i64 {
3481 let mut b = [0u8; 8];
3482 getrandom::getrandom(&mut b).expect("getrandom");
3483 i64::from_le_bytes(b)
3484}
3485
3486fn jitter_delay(base_ms: u64) -> Duration {
3490 let mut b = [0u8; 2];
3492 getrandom::getrandom(&mut b).unwrap_or(());
3493 let rand_frac = u16::from_le_bytes(b) as f64 / 65535.0; let factor = 0.80 + rand_frac * 0.40; Duration::from_millis((base_ms as f64 * factor) as u64)
3496}
3497
3498fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
3499 if data.is_empty() { return Some(vec![]); }
3500 let (len, start) = if data[0] < 254 { (data[0] as usize, 1) }
3501 else if data.len() >= 4 {
3502 (data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16, 4)
3503 } else { return None; };
3504 if data.len() < start + len { return None; }
3505 Some(data[start..start + len].to_vec())
3506}
3507
3508fn tl_read_string(data: &[u8]) -> Option<String> {
3509 tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
3510}
3511
3512fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
3513 use std::io::Read;
3514 let mut out = Vec::new();
3515 if flate2::read::GzDecoder::new(data).read_to_end(&mut out).is_ok() && !out.is_empty() {
3516 return Ok(out);
3517 }
3518 out.clear();
3519 flate2::read::ZlibDecoder::new(data)
3520 .read_to_end(&mut out)
3521 .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
3522 Ok(out)
3523}