1#![deny(unsafe_code)]
21
22mod errors;
23mod retry;
24mod session;
25mod transport;
26mod two_factor_auth;
27pub mod update;
28pub mod parsers;
29pub mod media;
30pub mod participants;
31pub mod pts;
32
33pub mod dc_pool;
35pub mod transport_obfuscated;
36pub mod transport_intermediate;
37pub mod socks5;
38pub mod session_backend;
39pub mod inline_iter;
40pub mod typing_guard;
41
42pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
43pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
44pub use update::Update;
45pub use media::{UploadedFile, DownloadIter};
46pub use participants::Participant;
47pub use typing_guard::TypingGuard;
48pub use socks5::Socks5Config;
49pub use session_backend::{SessionBackend, BinaryFileBackend, InMemoryBackend};
50
51use std::collections::HashMap;
52use std::collections::VecDeque;
53use std::num::NonZeroU32;
54use std::ops::ControlFlow;
55use std::sync::Arc;
56use std::time::Duration;
57
58use layer_tl_types as tl;
59use layer_mtproto::{EncryptedSession, Session, authentication as auth};
60use layer_tl_types::{Cursor, Deserializable, RemoteCall};
61use session::{DcEntry, PersistedSession};
62use tokio::io::{AsyncReadExt, AsyncWriteExt};
63use tokio::net::TcpStream;
64use tokio::sync::{mpsc, Mutex};
65use tokio::time::sleep;
66
67const ID_RPC_RESULT: u32 = 0xf35c6d01;
70const ID_RPC_ERROR: u32 = 0x2144ca19;
71const ID_MSG_CONTAINER: u32 = 0x73f1f8dc;
72const ID_GZIP_PACKED: u32 = 0x3072cfa1;
73const ID_PONG: u32 = 0x347773c5;
74const ID_MSGS_ACK: u32 = 0x62d6b459;
75const ID_BAD_SERVER_SALT: u32 = 0xedab447b;
76const ID_NEW_SESSION: u32 = 0x9ec20908;
77const ID_BAD_MSG_NOTIFY: u32 = 0xa7eff811;
78const ID_UPDATES: u32 = 0x74ae4240;
79const ID_UPDATE_SHORT: u32 = 0x78d4dec1;
80const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
81const ID_UPDATE_SHORT_MSG: u32 = 0x313bc7f8;
82const ID_UPDATE_SHORT_CHAT_MSG: u32 = 0x4d6deea5;
83const ID_UPDATES_TOO_LONG: u32 = 0xe317af7e;
84
85#[derive(Default)]
90pub(crate) struct PeerCache {
91 pub(crate) users: HashMap<i64, i64>,
93 pub(crate) channels: HashMap<i64, i64>,
95}
96
97impl PeerCache {
98 fn cache_user(&mut self, user: &tl::enums::User) {
99 if let tl::enums::User::User(u) = user {
100 if let Some(hash) = u.access_hash {
101 self.users.insert(u.id, hash);
102 }
103 }
104 }
105
106 fn cache_chat(&mut self, chat: &tl::enums::Chat) {
107 match chat {
108 tl::enums::Chat::Channel(c) => {
109 if let Some(hash) = c.access_hash {
110 self.channels.insert(c.id, hash);
111 }
112 }
113 tl::enums::Chat::ChannelForbidden(c) => {
114 self.channels.insert(c.id, c.access_hash);
115 }
116 _ => {}
117 }
118 }
119
120 fn cache_users(&mut self, users: &[tl::enums::User]) {
121 for u in users { self.cache_user(u); }
122 }
123
124 fn cache_chats(&mut self, chats: &[tl::enums::Chat]) {
125 for c in chats { self.cache_chat(c); }
126 }
127
128 fn user_input_peer(&self, user_id: i64) -> tl::enums::InputPeer {
129 if user_id == 0 {
130 return tl::enums::InputPeer::PeerSelf;
131 }
132 let hash = self.users.get(&user_id).copied().unwrap_or(0);
133 tl::enums::InputPeer::User(tl::types::InputPeerUser { user_id, access_hash: hash })
134 }
135
136 fn channel_input_peer(&self, channel_id: i64) -> tl::enums::InputPeer {
137 let hash = self.channels.get(&channel_id).copied().unwrap_or(0);
138 tl::enums::InputPeer::Channel(tl::types::InputPeerChannel { channel_id, access_hash: hash })
139 }
140
141 fn peer_to_input(&self, peer: &tl::enums::Peer) -> tl::enums::InputPeer {
142 match peer {
143 tl::enums::Peer::User(u) => self.user_input_peer(u.user_id),
144 tl::enums::Peer::Chat(c) => tl::enums::InputPeer::Chat(
145 tl::types::InputPeerChat { chat_id: c.chat_id }
146 ),
147 tl::enums::Peer::Channel(c) => self.channel_input_peer(c.channel_id),
148 }
149 }
150}
151
152#[derive(Clone, Default)]
164pub struct InputMessage {
165 pub text: String,
166 pub reply_to: Option<i32>,
167 pub silent: bool,
168 pub background: bool,
169 pub clear_draft: bool,
170 pub no_webpage: bool,
171 pub entities: Option<Vec<tl::enums::MessageEntity>>,
172 pub reply_markup: Option<tl::enums::ReplyMarkup>,
173 pub schedule_date: Option<i32>,
174}
175
176impl InputMessage {
177 pub fn text(text: impl Into<String>) -> Self {
179 Self { text: text.into(), ..Default::default() }
180 }
181
182 pub fn set_text(mut self, text: impl Into<String>) -> Self {
184 self.text = text.into(); self
185 }
186
187 pub fn reply_to(mut self, id: Option<i32>) -> Self {
189 self.reply_to = id; self
190 }
191
192 pub fn silent(mut self, v: bool) -> Self {
194 self.silent = v; self
195 }
196
197 pub fn background(mut self, v: bool) -> Self {
199 self.background = v; self
200 }
201
202 pub fn clear_draft(mut self, v: bool) -> Self {
204 self.clear_draft = v; self
205 }
206
207 pub fn no_webpage(mut self, v: bool) -> Self {
209 self.no_webpage = v; self
210 }
211
212 pub fn entities(mut self, e: Vec<tl::enums::MessageEntity>) -> Self {
214 self.entities = Some(e); self
215 }
216
217 pub fn reply_markup(mut self, rm: tl::enums::ReplyMarkup) -> Self {
219 self.reply_markup = Some(rm); self
220 }
221
222 pub fn schedule_date(mut self, ts: Option<i32>) -> Self {
224 self.schedule_date = ts; self
225 }
226
227 fn reply_header(&self) -> Option<tl::enums::InputReplyTo> {
228 self.reply_to.map(|id| {
229 tl::enums::InputReplyTo::Message(
230 tl::types::InputReplyToMessage {
231 reply_to_msg_id: id,
232 top_msg_id: None,
233 reply_to_peer_id: None,
234 quote_text: None,
235 quote_entities: None,
236 quote_offset: None,
237 monoforum_peer_id: None,
238 todo_item_id: None,
239 }
240 )
241 })
242 }
243}
244
245impl From<&str> for InputMessage {
246 fn from(s: &str) -> Self { Self::text(s) }
247}
248
249impl From<String> for InputMessage {
250 fn from(s: String) -> Self { Self::text(s) }
251}
252
253#[derive(Clone, Debug, Default)]
264pub enum TransportKind {
265 #[default]
269 Abridged,
270 Intermediate,
274 Full,
278 Obfuscated { secret: Option<[u8; 16]> },
285}
286
287#[derive(Clone)]
291pub struct Config {
292 pub api_id: i32,
293 pub api_hash: String,
294 pub dc_addr: Option<String>,
295 pub retry_policy: Arc<dyn RetryPolicy>,
296 pub socks5: Option<crate::socks5::Socks5Config>,
298 pub allow_ipv6: bool,
300 pub transport: TransportKind,
302 pub session_backend: Arc<dyn crate::session_backend::SessionBackend>,
304}
305
306impl Default for Config {
307 fn default() -> Self {
308 Self {
309 api_id: 0,
310 api_hash: String::new(),
311 dc_addr: None,
312 retry_policy: Arc::new(AutoSleep::default()),
313 socks5: None,
314 allow_ipv6: false,
315 transport: TransportKind::Abridged,
316 session_backend: Arc::new(crate::session_backend::BinaryFileBackend::new("layer.session")),
317 }
318 }
319}
320
321pub struct UpdateStream {
325 rx: mpsc::UnboundedReceiver<update::Update>,
326}
327
328impl UpdateStream {
329 pub async fn next(&mut self) -> Option<update::Update> {
331 self.rx.recv().await
332 }
333}
334
335#[derive(Debug, Clone)]
339pub struct Dialog {
340 pub raw: tl::enums::Dialog,
341 pub message: Option<tl::enums::Message>,
342 pub entity: Option<tl::enums::User>,
343 pub chat: Option<tl::enums::Chat>,
344}
345
346impl Dialog {
347 pub fn title(&self) -> String {
349 if let Some(tl::enums::User::User(u)) = &self.entity {
350 let first = u.first_name.as_deref().unwrap_or("");
351 let last = u.last_name.as_deref().unwrap_or("");
352 let name = format!("{first} {last}").trim().to_string();
353 if !name.is_empty() { return name; }
354 }
355 if let Some(chat) = &self.chat {
356 return match chat {
357 tl::enums::Chat::Chat(c) => c.title.clone(),
358 tl::enums::Chat::Forbidden(c) => c.title.clone(),
359 tl::enums::Chat::Channel(c) => c.title.clone(),
360 tl::enums::Chat::ChannelForbidden(c) => c.title.clone(),
361 tl::enums::Chat::Empty(_) => "(empty)".into(),
362 };
363 }
364 "(Unknown)".to_string()
365 }
366
367 pub fn peer(&self) -> Option<&tl::enums::Peer> {
369 match &self.raw {
370 tl::enums::Dialog::Dialog(d) => Some(&d.peer),
371 tl::enums::Dialog::Folder(_) => None,
372 }
373 }
374
375 pub fn unread_count(&self) -> i32 {
377 match &self.raw {
378 tl::enums::Dialog::Dialog(d) => d.unread_count,
379 _ => 0,
380 }
381 }
382
383 pub fn top_message(&self) -> i32 {
385 match &self.raw {
386 tl::enums::Dialog::Dialog(d) => d.top_message,
387 _ => 0,
388 }
389 }
390}
391
392struct ClientInner {
395 conn: Mutex<Connection>,
396 home_dc_id: Mutex<i32>,
397 dc_options: Mutex<HashMap<i32, DcEntry>>,
398 pub(crate) peer_cache: Mutex<PeerCache>,
399 pub(crate) pts_state: Mutex<pts::PtsState>,
400 api_id: i32,
401 api_hash: String,
402 retry_policy: Arc<dyn RetryPolicy>,
403 socks5: Option<crate::socks5::Socks5Config>,
404 allow_ipv6: bool,
405 transport: TransportKind,
406 session_backend: Arc<dyn crate::session_backend::SessionBackend>,
407 dc_pool: Mutex<dc_pool::DcPool>,
408 update_tx: mpsc::UnboundedSender<update::Update>,
409}
410
411#[derive(Clone)]
413pub struct Client {
414 pub(crate) inner: Arc<ClientInner>,
415 _update_rx: Arc<Mutex<mpsc::UnboundedReceiver<update::Update>>>,
416}
417
418impl Client {
419 pub async fn connect(config: Config) -> Result<Self, InvocationError> {
422 let (update_tx, update_rx) = mpsc::unbounded_channel();
423
424 let socks5 = config.socks5.clone();
426 let transport = config.transport.clone();
427
428 let (conn, home_dc_id, dc_opts) =
429 match config.session_backend.load()
430 .map_err(InvocationError::Io)?
431 {
432 Some(s) => {
433 if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
434 if let Some(key) = dc.auth_key {
435 log::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
436 match Connection::connect_with_key(
437 &dc.addr, key, dc.first_salt, dc.time_offset,
438 socks5.as_ref(), &transport,
439 ).await {
440 Ok(c) => {
441 let mut opts = session::default_dc_addresses()
442 .into_iter()
443 .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
444 .collect::<HashMap<_, _>>();
445 for d in &s.dcs { opts.insert(d.dc_id, d.clone()); }
446 (c, s.home_dc_id, opts)
447 }
448 Err(e) => {
449 log::warn!("[layer] Session connect failed ({e}), fresh connect …");
450 Self::fresh_connect(socks5.as_ref(), &transport).await?
451 }
452 }
453 } else {
454 Self::fresh_connect(socks5.as_ref(), &transport).await?
455 }
456 } else {
457 Self::fresh_connect(socks5.as_ref(), &transport).await?
458 }
459 }
460 None => Self::fresh_connect(socks5.as_ref(), &transport).await?,
461 };
462
463 let pool = dc_pool::DcPool::new(home_dc_id, &dc_opts.values().cloned().collect::<Vec<_>>());
465
466 let inner = Arc::new(ClientInner {
467 conn: Mutex::new(conn),
468 home_dc_id: Mutex::new(home_dc_id),
469 dc_options: Mutex::new(dc_opts),
470 peer_cache: Mutex::new(PeerCache::default()),
471 pts_state: Mutex::new(pts::PtsState::default()),
472 api_id: config.api_id,
473 api_hash: config.api_hash,
474 retry_policy: config.retry_policy,
475 socks5: config.socks5,
476 allow_ipv6: config.allow_ipv6,
477 transport: config.transport,
478 session_backend: config.session_backend,
479 dc_pool: Mutex::new(pool),
480 update_tx: update_tx,
481 });
482
483 let client = Self {
484 inner,
485 _update_rx: Arc::new(Mutex::new(update_rx)),
486 };
487
488 if let Err(e) = client.init_connection().await {
493 log::warn!("[layer] init_connection failed ({e}), retrying with fresh connect …");
494
495 let socks5_r = client.inner.socks5.clone();
496 let transport_r = client.inner.transport.clone();
497 let (new_conn, new_dc_id, new_opts) =
498 Self::fresh_connect(socks5_r.as_ref(), &transport_r).await?;
499
500 {
501 let mut conn_guard = client.inner.conn.lock().await;
502 *conn_guard = new_conn;
503 }
504 {
505 let mut dc_guard = client.inner.home_dc_id.lock().await;
506 *dc_guard = new_dc_id;
507 }
508 {
509 let mut opts_guard = client.inner.dc_options.lock().await;
510 *opts_guard = new_opts;
511 }
512
513 client.init_connection().await?;
514 }
515
516 let _ = client.sync_pts_state().await;
517 Ok(client)
518 }
519
520 async fn fresh_connect(
521 socks5: Option<&crate::socks5::Socks5Config>,
522 transport: &TransportKind,
523 ) -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
524 log::info!("[layer] Fresh connect to DC2 …");
525 let conn = Connection::connect_raw("149.154.167.51:443", socks5, transport).await?;
526 let opts = session::default_dc_addresses()
527 .into_iter()
528 .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
529 .collect();
530 Ok((conn, 2, opts))
531 }
532
533 pub async fn save_session(&self) -> Result<(), InvocationError> {
536 let conn_guard = self.inner.conn.lock().await;
537 let home_dc_id = *self.inner.home_dc_id.lock().await;
538 let dc_options = self.inner.dc_options.lock().await;
539
540 let mut dcs: Vec<DcEntry> = dc_options.values().map(|e| DcEntry {
541 dc_id: e.dc_id,
542 addr: e.addr.clone(),
543 auth_key: if e.dc_id == home_dc_id { Some(conn_guard.auth_key_bytes()) } else { e.auth_key },
544 first_salt: if e.dc_id == home_dc_id { conn_guard.first_salt() } else { e.first_salt },
545 time_offset: if e.dc_id == home_dc_id { conn_guard.time_offset() } else { e.time_offset },
546 }).collect();
547 self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
549
550 self.inner.session_backend
551 .save(&PersistedSession { home_dc_id, dcs })
552 .map_err(InvocationError::Io)?;
553 log::info!("[layer] Session saved ✓");
554 Ok(())
555 }
556
557 pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
561 match self.invoke(&tl::functions::updates::GetState {}).await {
562 Ok(_) => Ok(true),
563 Err(e) if e.is("AUTH_KEY_UNREGISTERED")
564 || matches!(&e, InvocationError::Rpc(r) if r.code == 401) => Ok(false),
565 Err(e) => Err(e),
566 }
567 }
568
569 pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
571 let req = tl::functions::auth::ImportBotAuthorization {
572 flags: 0, api_id: self.inner.api_id,
573 api_hash: self.inner.api_hash.clone(),
574 bot_auth_token: token.to_string(),
575 };
576
577 let result = match self.invoke(&req).await {
578 Ok(x) => x,
579 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
580 let dc_id = r.value.unwrap_or(2) as i32;
581 self.migrate_to(dc_id).await?;
582 self.invoke(&req).await?
583 }
584 Err(e) => return Err(e),
585 };
586
587 let name = match result {
588 tl::enums::auth::Authorization::Authorization(a) => {
589 self.cache_user(&a.user).await;
590 Self::extract_user_name(&a.user)
591 }
592 tl::enums::auth::Authorization::SignUpRequired(_) => {
593 panic!("unexpected SignUpRequired during bot sign-in")
594 }
595 };
596 log::info!("[layer] Bot signed in ✓ ({name})");
597 Ok(name)
598 }
599
600 pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
602 use tl::enums::auth::SentCode;
603
604 let req = self.make_send_code_req(phone);
605 let body = match self.rpc_call_raw(&req).await {
606 Ok(b) => b,
607 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
608 let dc_id = r.value.unwrap_or(2) as i32;
609 self.migrate_to(dc_id).await?;
610 self.rpc_call_raw(&req).await?
611 }
612 Err(e) => return Err(e),
613 };
614
615 let mut cur = Cursor::from_slice(&body);
616 let hash = match tl::enums::auth::SentCode::deserialize(&mut cur)? {
617 SentCode::SentCode(s) => s.phone_code_hash,
618 SentCode::Success(_) => return Err(InvocationError::Deserialize("unexpected Success".into())),
619 SentCode::PaymentRequired(_) => return Err(InvocationError::Deserialize("payment required to send code".into())),
620 };
621 log::info!("[layer] Login code sent");
622 Ok(LoginToken { phone: phone.to_string(), phone_code_hash: hash })
623 }
624
625 pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
627 let req = tl::functions::auth::SignIn {
628 phone_number: token.phone.clone(),
629 phone_code_hash: token.phone_code_hash.clone(),
630 phone_code: Some(code.trim().to_string()),
631 email_verification: None,
632 };
633
634 let body = match self.rpc_call_raw(&req).await {
635 Ok(b) => b,
636 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
637 let dc_id = r.value.unwrap_or(2) as i32;
638 self.migrate_to(dc_id).await.map_err(SignInError::Other)?;
639 self.rpc_call_raw(&req).await.map_err(SignInError::Other)?
640 }
641 Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
642 let t = self.get_password_info().await.map_err(SignInError::Other)?;
643 return Err(SignInError::PasswordRequired(t));
644 }
645 Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
646 Err(e) => return Err(SignInError::Other(e)),
647 };
648
649 let mut cur = Cursor::from_slice(&body);
650 match tl::enums::auth::Authorization::deserialize(&mut cur)
651 .map_err(|e| SignInError::Other(e.into()))?
652 {
653 tl::enums::auth::Authorization::Authorization(a) => {
654 self.cache_user(&a.user).await;
655 let name = Self::extract_user_name(&a.user);
656 log::info!("[layer] Signed in ✓ Welcome, {name}!");
657 Ok(name)
658 }
659 tl::enums::auth::Authorization::SignUpRequired(_) => Err(SignInError::SignUpRequired),
660 }
661 }
662
663 pub async fn check_password(
665 &self,
666 token: PasswordToken,
667 password: impl AsRef<[u8]>,
668 ) -> Result<String, InvocationError> {
669 let pw = token.password;
670 let algo = pw.current_algo.ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
671 let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
672 let g_b = pw.srp_b.ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
673 let a = pw.secure_random;
674 let srp_id = pw.srp_id.ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
675
676 let (m1, g_a) = two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
677 let req = tl::functions::auth::CheckPassword {
678 password: tl::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
679 tl::types::InputCheckPasswordSrp {
680 srp_id, a: g_a.to_vec(), m1: m1.to_vec(),
681 },
682 ),
683 };
684
685 let body = self.rpc_call_raw(&req).await?;
686 let mut cur = Cursor::from_slice(&body);
687 match tl::enums::auth::Authorization::deserialize(&mut cur)? {
688 tl::enums::auth::Authorization::Authorization(a) => {
689 self.cache_user(&a.user).await;
690 let name = Self::extract_user_name(&a.user);
691 log::info!("[layer] 2FA ✓ Welcome, {name}!");
692 Ok(name)
693 }
694 tl::enums::auth::Authorization::SignUpRequired(_) =>
695 Err(InvocationError::Deserialize("unexpected SignUpRequired after 2FA".into())),
696 }
697 }
698
699 pub async fn sign_out(&self) -> Result<bool, InvocationError> {
701 let req = tl::functions::auth::LogOut {};
702 match self.rpc_call_raw(&req).await {
703 Ok(_) => { log::info!("[layer] Signed out ✓"); Ok(true) }
704 Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
705 Err(e) => Err(e),
706 }
707 }
708
709 pub async fn get_me(&self) -> Result<tl::types::User, InvocationError> {
713 let req = tl::functions::users::GetUsers {
714 id: vec![tl::enums::InputUser::UserSelf],
715 };
716 let body = self.rpc_call_raw(&req).await?;
717 let mut cur = Cursor::from_slice(&body);
718 let users = Vec::<tl::enums::User>::deserialize(&mut cur)?;
719 self.cache_users_slice(&users).await;
720 users.into_iter().find_map(|u| match u {
721 tl::enums::User::User(u) => Some(u),
722 _ => None,
723 }).ok_or_else(|| InvocationError::Deserialize("getUsers returned no user".into()))
724 }
725
726 pub fn stream_updates(&self) -> UpdateStream {
730 let (tx, rx) = mpsc::unbounded_channel();
731 let client = self.clone();
732 tokio::spawn(async move {
733 client.run_update_loop(tx).await;
734 });
735 UpdateStream { rx }
736 }
737
738 async fn run_update_loop(&self, tx: mpsc::UnboundedSender<update::Update>) {
739 loop {
740 let result = {
741 let mut conn = self.inner.conn.lock().await;
742 match tokio::time::timeout(Duration::from_secs(30), conn.recv_once()).await {
743 Ok(Ok(updates)) => Ok(updates),
744 Ok(Err(e)) => Err(e),
745 Err(_timeout) => {
746 let _ = conn.send_ping().await;
747 continue;
748 }
749 }
750 };
751
752 match result {
753 Ok(updates) => {
754 for u in updates { let _ = tx.send(u); }
755 }
756 Err(e) => {
757 log::warn!("[layer] Update loop error: {e} — reconnecting …");
758 sleep(Duration::from_secs(1)).await;
759 let home_dc_id = *self.inner.home_dc_id.lock().await;
760 let (addr, saved_key, first_salt, time_offset) = {
761 let opts = self.inner.dc_options.lock().await;
762 match opts.get(&home_dc_id) {
763 Some(e) => (e.addr.clone(), e.auth_key, e.first_salt, e.time_offset),
764 None => ("149.154.167.51:443".to_string(), None, 0, 0),
765 }
766 };
767 let socks5 = self.inner.socks5.clone();
768 let transport = self.inner.transport.clone();
769
770 let new_conn_result = if let Some(key) = saved_key {
773 log::info!("[layer] Reconnecting to DC{home_dc_id} with saved key …");
774 match Connection::connect_with_key(
775 &addr, key, first_salt, time_offset,
776 socks5.as_ref(), &transport,
777 ).await {
778 Ok(c) => Ok(c),
779 Err(e2) => {
780 log::warn!("[layer] connect_with_key failed ({e2}), falling back to fresh DH …");
781 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await
782 }
783 }
784 } else {
785 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await
786 };
787
788 match new_conn_result {
789 Ok(new_conn) => {
790 *self.inner.conn.lock().await = new_conn;
791 if let Err(e2) = self.init_connection().await {
792 log::warn!("[layer] init_connection after reconnect failed: {e2}");
793 }
794 match self.get_difference().await {
796 Ok(missed) => {
797 for u in missed { let _ = tx.send(u); }
798 }
799 Err(e2) => log::warn!("[layer] getDifference after reconnect failed: {e2}"),
800 }
801 }
802 Err(e2) => {
803 log::error!("[layer] Reconnect failed: {e2}");
804 break;
805 }
806 }
807 }
808 }
809 }
810 }
811
812 pub async fn send_message(&self, peer: &str, text: &str) -> Result<(), InvocationError> {
816 let p = self.resolve_peer(peer).await?;
817 self.send_message_to_peer(p, text).await
818 }
819
820 pub async fn send_message_to_peer(
822 &self,
823 peer: tl::enums::Peer,
824 text: &str,
825 ) -> Result<(), InvocationError> {
826 self.send_message_to_peer_ex(peer, &InputMessage::text(text)).await
827 }
828
829 pub async fn send_message_to_peer_ex(
831 &self,
832 peer: tl::enums::Peer,
833 msg: &InputMessage,
834 ) -> Result<(), InvocationError> {
835 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
836 let req = tl::functions::messages::SendMessage {
837 no_webpage: msg.no_webpage,
838 silent: msg.silent,
839 background: msg.background,
840 clear_draft: msg.clear_draft,
841 noforwards: false,
842 update_stickersets_order: false,
843 invert_media: false,
844 allow_paid_floodskip: false,
845 peer: input_peer,
846 reply_to: msg.reply_header(),
847 message: msg.text.clone(),
848 random_id: random_i64(),
849 reply_markup: msg.reply_markup.clone(),
850 entities: msg.entities.clone(),
851 schedule_date: msg.schedule_date,
852 schedule_repeat_period: None,
853 send_as: None,
854 quick_reply_shortcut: None,
855 effect: None,
856 allow_paid_stars: None,
857 suggested_post: None,
858 };
859 self.rpc_write(&req).await
860 }
861
862 pub async fn send_to_self(&self, text: &str) -> Result<(), InvocationError> {
864 let req = tl::functions::messages::SendMessage {
865 no_webpage: false,
866 silent: false,
867 background: false,
868 clear_draft: false,
869 noforwards: false,
870 update_stickersets_order: false,
871 invert_media: false,
872 allow_paid_floodskip: false,
873 peer: tl::enums::InputPeer::PeerSelf,
874 reply_to: None,
875 message: text.to_string(),
876 random_id: random_i64(),
877 reply_markup: None,
878 entities: None,
879 schedule_date: None,
880 schedule_repeat_period: None,
881 send_as: None,
882 quick_reply_shortcut: None,
883 effect: None,
884 allow_paid_stars: None,
885 suggested_post: None,
886 };
887 self.rpc_write(&req).await
888 }
889
890 pub async fn edit_message(
892 &self,
893 peer: tl::enums::Peer,
894 message_id: i32,
895 new_text: &str,
896 ) -> Result<(), InvocationError> {
897 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
898 let req = tl::functions::messages::EditMessage {
899 no_webpage: false,
900 invert_media: false,
901 peer: input_peer,
902 id: message_id,
903 message: Some(new_text.to_string()),
904 media: None,
905 reply_markup: None,
906 entities: None,
907 schedule_date: None,
908 schedule_repeat_period: None,
909 quick_reply_shortcut_id: None,
910 };
911 self.rpc_write(&req).await
912 }
913
914 pub async fn forward_messages(
916 &self,
917 destination: tl::enums::Peer,
918 message_ids: &[i32],
919 source: tl::enums::Peer,
920 ) -> Result<(), InvocationError> {
921 let cache = self.inner.peer_cache.lock().await;
922 let to_peer = cache.peer_to_input(&destination);
923 let from_peer = cache.peer_to_input(&source);
924 drop(cache);
925
926 let req = tl::functions::messages::ForwardMessages {
927 silent: false,
928 background: false,
929 with_my_score: false,
930 drop_author: false,
931 drop_media_captions: false,
932 noforwards: false,
933 from_peer: from_peer,
934 id: message_ids.to_vec(),
935 random_id: (0..message_ids.len()).map(|_| random_i64()).collect(),
936 to_peer: to_peer,
937 top_msg_id: None,
938 reply_to: None,
939 schedule_date: None,
940 schedule_repeat_period: None,
941 send_as: None,
942 quick_reply_shortcut: None,
943 effect: None,
944 video_timestamp: None,
945 allow_paid_stars: None,
946 allow_paid_floodskip: false,
947 suggested_post: None,
948 };
949 self.rpc_write(&req).await
950 }
951
952 pub async fn delete_messages(&self, message_ids: Vec<i32>, revoke: bool) -> Result<(), InvocationError> {
954 let req = tl::functions::messages::DeleteMessages { revoke, id: message_ids };
955 self.rpc_write(&req).await
956 }
957
958 pub async fn get_messages_by_id(
960 &self,
961 peer: tl::enums::Peer,
962 ids: &[i32],
963 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
964 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
965 let id_list: Vec<tl::enums::InputMessage> = ids.iter()
966 .map(|&id| tl::enums::InputMessage::Id(tl::types::InputMessageId { id }))
967 .collect();
968 let req = tl::functions::channels::GetMessages {
969 channel: match &input_peer {
970 tl::enums::InputPeer::Channel(c) =>
971 tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
972 channel_id: c.channel_id, access_hash: c.access_hash
973 }),
974 _ => return self.get_messages_user(input_peer, id_list).await,
975 },
976 id: id_list,
977 };
978 let body = self.rpc_call_raw(&req).await?;
979 let mut cur = Cursor::from_slice(&body);
980 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
981 tl::enums::messages::Messages::Messages(m) => m.messages,
982 tl::enums::messages::Messages::Slice(m) => m.messages,
983 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
984 tl::enums::messages::Messages::NotModified(_) => vec![],
985 };
986 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
987 }
988
989 async fn get_messages_user(
990 &self,
991 _peer: tl::enums::InputPeer,
992 ids: Vec<tl::enums::InputMessage>,
993 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
994 let req = tl::functions::messages::GetMessages { id: ids };
995 let body = self.rpc_call_raw(&req).await?;
996 let mut cur = Cursor::from_slice(&body);
997 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
998 tl::enums::messages::Messages::Messages(m) => m.messages,
999 tl::enums::messages::Messages::Slice(m) => m.messages,
1000 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1001 tl::enums::messages::Messages::NotModified(_) => vec![],
1002 };
1003 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1004 }
1005
1006 pub async fn get_pinned_message(
1008 &self,
1009 peer: tl::enums::Peer,
1010 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
1011 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1012 let req = tl::functions::messages::Search {
1013 peer: input_peer,
1014 q: String::new(),
1015 from_id: None,
1016 saved_peer_id: None,
1017 saved_reaction: None,
1018 top_msg_id: None,
1019 filter: tl::enums::MessagesFilter::InputMessagesFilterPinned,
1020 min_date: 0,
1021 max_date: 0,
1022 offset_id: 0,
1023 add_offset: 0,
1024 limit: 1,
1025 max_id: 0,
1026 min_id: 0,
1027 hash: 0,
1028 };
1029 let body = self.rpc_call_raw(&req).await?;
1030 let mut cur = Cursor::from_slice(&body);
1031 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1032 tl::enums::messages::Messages::Messages(m) => m.messages,
1033 tl::enums::messages::Messages::Slice(m) => m.messages,
1034 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1035 tl::enums::messages::Messages::NotModified(_) => vec![],
1036 };
1037 Ok(msgs.into_iter().next().map(update::IncomingMessage::from_raw))
1038 }
1039
1040 pub async fn pin_message(
1042 &self,
1043 peer: tl::enums::Peer,
1044 message_id: i32,
1045 silent: bool,
1046 unpin: bool,
1047 pm_oneside: bool,
1048 ) -> Result<(), InvocationError> {
1049 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1050 let req = tl::functions::messages::UpdatePinnedMessage {
1051 silent,
1052 unpin,
1053 pm_oneside,
1054 peer: input_peer,
1055 id: message_id,
1056 };
1057 self.rpc_write(&req).await
1058 }
1059
1060 pub async fn unpin_message(
1062 &self,
1063 peer: tl::enums::Peer,
1064 message_id: i32,
1065 ) -> Result<(), InvocationError> {
1066 self.pin_message(peer, message_id, true, true, false).await
1067 }
1068
1069 pub async fn unpin_all_messages(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1071 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1072 let req = tl::functions::messages::UnpinAllMessages {
1073 peer: input_peer,
1074 top_msg_id: None,
1075 saved_peer_id: None,
1076 };
1077 self.rpc_write(&req).await
1078 }
1079
1080 pub async fn search_messages(
1084 &self,
1085 peer: tl::enums::Peer,
1086 query: &str,
1087 limit: i32,
1088 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1089 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1090 let req = tl::functions::messages::Search {
1091 peer: input_peer,
1092 q: query.to_string(),
1093 from_id: None,
1094 saved_peer_id: None,
1095 saved_reaction: None,
1096 top_msg_id: None,
1097 filter: tl::enums::MessagesFilter::InputMessagesFilterEmpty,
1098 min_date: 0,
1099 max_date: 0,
1100 offset_id: 0,
1101 add_offset: 0,
1102 limit,
1103 max_id: 0,
1104 min_id: 0,
1105 hash: 0,
1106 };
1107 let body = self.rpc_call_raw(&req).await?;
1108 let mut cur = Cursor::from_slice(&body);
1109 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1110 tl::enums::messages::Messages::Messages(m) => m.messages,
1111 tl::enums::messages::Messages::Slice(m) => m.messages,
1112 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1113 tl::enums::messages::Messages::NotModified(_) => vec![],
1114 };
1115 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1116 }
1117
1118 pub async fn search_global(
1120 &self,
1121 query: &str,
1122 limit: i32,
1123 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1124 let req = tl::functions::messages::SearchGlobal {
1125 broadcasts_only: false,
1126 groups_only: false,
1127 users_only: false,
1128 folder_id: None,
1129 q: query.to_string(),
1130 filter: tl::enums::MessagesFilter::InputMessagesFilterEmpty,
1131 min_date: 0,
1132 max_date: 0,
1133 offset_rate: 0,
1134 offset_peer: tl::enums::InputPeer::Empty,
1135 offset_id: 0,
1136 limit,
1137 };
1138 let body = self.rpc_call_raw(&req).await?;
1139 let mut cur = Cursor::from_slice(&body);
1140 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1141 tl::enums::messages::Messages::Messages(m) => m.messages,
1142 tl::enums::messages::Messages::Slice(m) => m.messages,
1143 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1144 tl::enums::messages::Messages::NotModified(_) => vec![],
1145 };
1146 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1147 }
1148
1149 pub async fn get_scheduled_messages(
1166 &self,
1167 peer: tl::enums::Peer,
1168 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1169 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1170 let req = tl::functions::messages::GetScheduledHistory {
1171 peer: input_peer,
1172 hash: 0,
1173 };
1174 let body = self.rpc_call_raw(&req).await?;
1175 let mut cur = Cursor::from_slice(&body);
1176 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1177 tl::enums::messages::Messages::Messages(m) => m.messages,
1178 tl::enums::messages::Messages::Slice(m) => m.messages,
1179 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1180 tl::enums::messages::Messages::NotModified(_) => vec![],
1181 };
1182 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1183 }
1184
1185 pub async fn delete_scheduled_messages(
1187 &self,
1188 peer: tl::enums::Peer,
1189 ids: Vec<i32>,
1190 ) -> Result<(), InvocationError> {
1191 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1192 let req = tl::functions::messages::DeleteScheduledMessages {
1193 peer: input_peer,
1194 id: ids,
1195 };
1196 self.rpc_write(&req).await
1197 }
1198
1199 pub async fn answer_callback_query(
1202 &self,
1203 query_id: i64,
1204 text: Option<&str>,
1205 alert: bool,
1206 ) -> Result<bool, InvocationError> {
1207 let req = tl::functions::messages::SetBotCallbackAnswer {
1208 alert,
1209 query_id,
1210 message: text.map(|s| s.to_string()),
1211 url: None,
1212 cache_time: 0,
1213 };
1214 let body = self.rpc_call_raw(&req).await?;
1215 Ok(!body.is_empty())
1216 }
1217
1218 pub async fn answer_inline_query(
1219 &self,
1220 query_id: i64,
1221 results: Vec<tl::enums::InputBotInlineResult>,
1222 cache_time: i32,
1223 is_personal: bool,
1224 next_offset: Option<String>,
1225 ) -> Result<bool, InvocationError> {
1226 let req = tl::functions::messages::SetInlineBotResults {
1227 gallery: false,
1228 private: is_personal,
1229 query_id,
1230 results,
1231 cache_time,
1232 next_offset,
1233 switch_pm: None,
1234 switch_webview: None,
1235 };
1236 let body = self.rpc_call_raw(&req).await?;
1237 Ok(!body.is_empty())
1238 }
1239
1240 pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
1244 let req = tl::functions::messages::GetDialogs {
1245 exclude_pinned: false,
1246 folder_id: None,
1247 offset_date: 0,
1248 offset_id: 0,
1249 offset_peer: tl::enums::InputPeer::Empty,
1250 limit,
1251 hash: 0,
1252 };
1253
1254 let body = self.rpc_call_raw(&req).await?;
1255 let mut cur = Cursor::from_slice(&body);
1256 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
1257 tl::enums::messages::Dialogs::Dialogs(d) => d,
1258 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
1259 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
1260 },
1261 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
1262 };
1263
1264 let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
1266 .filter_map(|m| {
1267 let id = match &m {
1268 tl::enums::Message::Message(x) => x.id,
1269 tl::enums::Message::Service(x) => x.id,
1270 tl::enums::Message::Empty(x) => x.id,
1271 };
1272 Some((id, m))
1273 })
1274 .collect();
1275
1276 let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
1278 .filter_map(|u| {
1279 if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
1280 })
1281 .collect();
1282
1283 let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
1285 .filter_map(|c| {
1286 let id = match &c {
1287 tl::enums::Chat::Chat(x) => x.id,
1288 tl::enums::Chat::Forbidden(x) => x.id,
1289 tl::enums::Chat::Channel(x) => x.id,
1290 tl::enums::Chat::ChannelForbidden(x) => x.id,
1291 tl::enums::Chat::Empty(x) => x.id,
1292 };
1293 Some((id, c))
1294 })
1295 .collect();
1296
1297 {
1299 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
1300 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
1301 self.cache_users_slice(&u_list).await;
1302 self.cache_chats_slice(&c_list).await;
1303 }
1304
1305 let result = raw.dialogs.into_iter().map(|d| {
1306 let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
1307 let peer = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
1308
1309 let message = msg_map.get(&top_id).cloned();
1310 let entity = peer.and_then(|p| match p {
1311 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
1312 _ => None,
1313 });
1314 let chat = peer.and_then(|p| match p {
1315 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
1316 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
1317 _ => None,
1318 });
1319
1320 Dialog { raw: d, message, entity, chat }
1321 }).collect();
1322
1323 Ok(result)
1324 }
1325
1326 async fn get_dialogs_raw(
1328 &self,
1329 req: tl::functions::messages::GetDialogs,
1330 ) -> Result<Vec<Dialog>, InvocationError> {
1331 let body = self.rpc_call_raw(&req).await?;
1332 let mut cur = Cursor::from_slice(&body);
1333 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
1334 tl::enums::messages::Dialogs::Dialogs(d) => d,
1335 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
1336 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
1337 },
1338 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
1339 };
1340
1341 let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
1342 .filter_map(|m| {
1343 let id = match &m {
1344 tl::enums::Message::Message(x) => x.id,
1345 tl::enums::Message::Service(x) => x.id,
1346 tl::enums::Message::Empty(x) => x.id,
1347 };
1348 Some((id, m))
1349 })
1350 .collect();
1351
1352 let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
1353 .filter_map(|u| {
1354 if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
1355 })
1356 .collect();
1357
1358 let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
1359 .filter_map(|c| {
1360 let id = match &c {
1361 tl::enums::Chat::Chat(x) => x.id,
1362 tl::enums::Chat::Forbidden(x) => x.id,
1363 tl::enums::Chat::Channel(x) => x.id,
1364 tl::enums::Chat::ChannelForbidden(x) => x.id,
1365 tl::enums::Chat::Empty(x) => x.id,
1366 };
1367 Some((id, c))
1368 })
1369 .collect();
1370
1371 {
1372 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
1373 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
1374 self.cache_users_slice(&u_list).await;
1375 self.cache_chats_slice(&c_list).await;
1376 }
1377
1378 let result = raw.dialogs.into_iter().map(|d| {
1379 let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
1380 let peer = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
1381
1382 let message = msg_map.get(&top_id).cloned();
1383 let entity = peer.and_then(|p| match p {
1384 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
1385 _ => None,
1386 });
1387 let chat = peer.and_then(|p| match p {
1388 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
1389 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
1390 _ => None,
1391 });
1392
1393 Dialog { raw: d, message, entity, chat }
1394 }).collect();
1395
1396 Ok(result)
1397 }
1398 pub async fn delete_dialog(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1399 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1400 let req = tl::functions::messages::DeleteHistory {
1401 just_clear: false,
1402 revoke: false,
1403 peer: input_peer,
1404 max_id: 0,
1405 min_date: None,
1406 max_date: None,
1407 };
1408 self.rpc_write(&req).await
1409 }
1410
1411 pub async fn mark_as_read(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1413 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1414 match &input_peer {
1415 tl::enums::InputPeer::Channel(c) => {
1416 let req = tl::functions::channels::ReadHistory {
1417 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
1418 channel_id: c.channel_id, access_hash: c.access_hash,
1419 }),
1420 max_id: 0,
1421 };
1422 self.rpc_call_raw(&req).await?;
1423 }
1424 _ => {
1425 let req = tl::functions::messages::ReadHistory { peer: input_peer, max_id: 0 };
1426 self.rpc_call_raw(&req).await?;
1427 }
1428 }
1429 Ok(())
1430 }
1431
1432 pub async fn clear_mentions(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1434 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1435 let req = tl::functions::messages::ReadMentions { peer: input_peer, top_msg_id: None };
1436 self.rpc_write(&req).await
1437 }
1438
1439 pub async fn send_chat_action(
1445 &self,
1446 peer: tl::enums::Peer,
1447 action: tl::enums::SendMessageAction,
1448 ) -> Result<(), InvocationError> {
1449 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1450 let req = tl::functions::messages::SetTyping {
1451 peer: input_peer,
1452 top_msg_id: None,
1453 action,
1454 };
1455 self.rpc_write(&req).await
1456 }
1457
1458 pub async fn join_chat(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1462 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1463 match input_peer {
1464 tl::enums::InputPeer::Channel(c) => {
1465 let req = tl::functions::channels::JoinChannel {
1466 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
1467 channel_id: c.channel_id, access_hash: c.access_hash,
1468 }),
1469 };
1470 self.rpc_call_raw(&req).await?;
1471 }
1472 tl::enums::InputPeer::Chat(c) => {
1473 let req = tl::functions::messages::AddChatUser {
1474 chat_id: c.chat_id,
1475 user_id: tl::enums::InputUser::UserSelf,
1476 fwd_limit: 0,
1477 };
1478 self.rpc_call_raw(&req).await?;
1479 }
1480 _ => return Err(InvocationError::Deserialize("cannot join this peer type".into())),
1481 }
1482 Ok(())
1483 }
1484
1485 pub async fn accept_invite_link(&self, link: &str) -> Result<(), InvocationError> {
1487 let hash = Self::parse_invite_hash(link)
1488 .ok_or_else(|| InvocationError::Deserialize(format!("invalid invite link: {link}")))?;
1489 let req = tl::functions::messages::ImportChatInvite { hash: hash.to_string() };
1490 self.rpc_write(&req).await
1491 }
1492
1493 pub fn parse_invite_hash(link: &str) -> Option<&str> {
1495 if let Some(pos) = link.find("/+") {
1496 return Some(&link[pos + 2..]);
1497 }
1498 if let Some(pos) = link.find("/joinchat/") {
1499 return Some(&link[pos + 10..]);
1500 }
1501 None
1502 }
1503
1504 pub async fn get_messages(
1508 &self,
1509 peer: tl::enums::InputPeer,
1510 limit: i32,
1511 offset_id: i32,
1512 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1513 let req = tl::functions::messages::GetHistory {
1514 peer, offset_id, offset_date: 0, add_offset: 0,
1515 limit, max_id: 0, min_id: 0, hash: 0,
1516 };
1517 let body = self.rpc_call_raw(&req).await?;
1518 let mut cur = Cursor::from_slice(&body);
1519 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1520 tl::enums::messages::Messages::Messages(m) => m.messages,
1521 tl::enums::messages::Messages::Slice(m) => m.messages,
1522 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1523 tl::enums::messages::Messages::NotModified(_) => vec![],
1524 };
1525 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1526 }
1527
1528 pub async fn resolve_peer(
1532 &self,
1533 peer: &str,
1534 ) -> Result<tl::enums::Peer, InvocationError> {
1535 match peer.trim() {
1536 "me" | "self" => Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 })),
1537 username if username.starts_with('@') => {
1538 self.resolve_username(&username[1..]).await
1539 }
1540 id_str => {
1541 if let Ok(id) = id_str.parse::<i64>() {
1542 Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: id }))
1543 } else {
1544 Err(InvocationError::Deserialize(format!("cannot resolve peer: {peer}")))
1545 }
1546 }
1547 }
1548 }
1549
1550 async fn resolve_username(&self, username: &str) -> Result<tl::enums::Peer, InvocationError> {
1551 let req = tl::functions::contacts::ResolveUsername {
1552 username: username.to_string(), referer: None,
1553 };
1554 let body = self.rpc_call_raw(&req).await?;
1555 let mut cur = Cursor::from_slice(&body);
1556 let resolved = match tl::enums::contacts::ResolvedPeer::deserialize(&mut cur)? {
1557 tl::enums::contacts::ResolvedPeer::ResolvedPeer(r) => r,
1558 };
1559 self.cache_users_slice(&resolved.users).await;
1561 self.cache_chats_slice(&resolved.chats).await;
1562 Ok(resolved.peer)
1563 }
1564
1565 pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
1569 let body = self.rpc_call_raw(req).await?;
1570 let mut cur = Cursor::from_slice(&body);
1571 R::Return::deserialize(&mut cur).map_err(Into::into)
1572 }
1573
1574 async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
1575 let mut fail_count = NonZeroU32::new(1).unwrap();
1576 let mut slept_so_far = Duration::default();
1577 loop {
1578 match self.do_rpc_call(req).await {
1579 Ok(body) => return Ok(body),
1580 Err(e) => {
1581 let ctx = RetryContext { fail_count, slept_so_far, error: e };
1582 match self.inner.retry_policy.should_retry(&ctx) {
1583 ControlFlow::Continue(delay) => {
1584 sleep(delay).await;
1585 slept_so_far += delay;
1586 fail_count = fail_count.saturating_add(1);
1587 }
1588 ControlFlow::Break(()) => return Err(ctx.error),
1589 }
1590 }
1591 }
1592 }
1593 }
1594
1595 async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
1596 let mut side_updates = Vec::new();
1597 let result = {
1598 let mut conn = self.inner.conn.lock().await;
1599 conn.rpc_call(req, &mut side_updates).await
1600 };
1601 for u in side_updates {
1602 let _ = self.inner.update_tx.send(u);
1603 }
1604 result
1605 }
1606
1607 async fn rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
1611 let mut fail_count = NonZeroU32::new(1).unwrap();
1612 let mut slept_so_far = Duration::default();
1613 loop {
1614 let mut side_updates = Vec::new();
1615 let result = {
1616 let mut conn = self.inner.conn.lock().await;
1617 conn.rpc_call_ack(req, &mut side_updates).await
1618 };
1619 for u in side_updates {
1620 let _ = self.inner.update_tx.send(u);
1621 }
1622 match result {
1623 Ok(()) => return Ok(()),
1624 Err(e) => {
1625 let ctx = RetryContext { fail_count, slept_so_far, error: e };
1626 match self.inner.retry_policy.should_retry(&ctx) {
1627 ControlFlow::Continue(delay) => {
1628 sleep(delay).await;
1629 slept_so_far += delay;
1630 fail_count = fail_count.saturating_add(1);
1631 }
1632 ControlFlow::Break(()) => return Err(ctx.error),
1633 }
1634 }
1635 }
1636 }
1637 }
1638
1639 async fn init_connection(&self) -> Result<(), InvocationError> {
1642 use tl::functions::{InvokeWithLayer, InitConnection, help::GetConfig};
1643 let req = InvokeWithLayer {
1644 layer: tl::LAYER,
1645 query: InitConnection {
1646 api_id: self.inner.api_id,
1647 device_model: "Linux".to_string(),
1648 system_version: "1.0".to_string(),
1649 app_version: env!("CARGO_PKG_VERSION").to_string(),
1650 system_lang_code: "en".to_string(),
1651 lang_pack: "".to_string(),
1652 lang_code: "en".to_string(),
1653 proxy: None,
1654 params: None,
1655 query: GetConfig {},
1656 },
1657 };
1658
1659 let body = {
1660 let mut side_updates = Vec::new();
1661 let result = {
1662 let mut conn = self.inner.conn.lock().await;
1663 conn.rpc_call_serializable(&req, &mut side_updates).await
1664 };
1665 for u in side_updates {
1666 let _ = self.inner.update_tx.send(u);
1667 }
1668 result?
1669 };
1670
1671 let mut cur = Cursor::from_slice(&body);
1672 if let Ok(tl::enums::Config::Config(cfg)) = tl::enums::Config::deserialize(&mut cur) {
1673 let allow_ipv6 = self.inner.allow_ipv6;
1674 let mut opts = self.inner.dc_options.lock().await;
1675 for opt in &cfg.dc_options {
1676 let tl::enums::DcOption::DcOption(o) = opt;
1677 if o.media_only || o.cdn || o.tcpo_only { continue; }
1678 if o.ipv6 && !allow_ipv6 { continue; }
1679 let addr = format!("{}:{}", o.ip_address, o.port);
1680 let entry = opts.entry(o.id).or_insert_with(|| DcEntry {
1681 dc_id: o.id, addr: addr.clone(),
1682 auth_key: None, first_salt: 0, time_offset: 0,
1683 });
1684 entry.addr = addr;
1685 }
1686 log::info!("[layer] initConnection ✓ ({} DCs, ipv6={})", cfg.dc_options.len(), allow_ipv6);
1687 }
1688 Ok(())
1689 }
1690
1691 async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
1694 let addr = {
1695 let opts = self.inner.dc_options.lock().await;
1696 opts.get(&new_dc_id).map(|e| e.addr.clone())
1697 .unwrap_or_else(|| "149.154.167.51:443".to_string())
1698 };
1699 log::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
1700
1701 let saved_key = {
1702 let opts = self.inner.dc_options.lock().await;
1703 opts.get(&new_dc_id).and_then(|e| e.auth_key)
1704 };
1705
1706 let socks5 = self.inner.socks5.clone();
1707 let transport = self.inner.transport.clone();
1708 let conn = if let Some(key) = saved_key {
1709 Connection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
1710 } else {
1711 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
1712 };
1713
1714 let new_key = conn.auth_key_bytes();
1715 {
1716 let mut opts = self.inner.dc_options.lock().await;
1717 let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
1718 dc_id: new_dc_id, addr: addr.clone(),
1719 auth_key: None, first_salt: 0, time_offset: 0,
1720 });
1721 entry.auth_key = Some(new_key);
1722 }
1723
1724 *self.inner.conn.lock().await = conn;
1725 *self.inner.home_dc_id.lock().await = new_dc_id;
1726 self.init_connection().await?;
1727 log::info!("[layer] Now on DC{new_dc_id} ✓");
1728 Ok(())
1729 }
1730
1731 async fn cache_user(&self, user: &tl::enums::User) {
1734 self.inner.peer_cache.lock().await.cache_user(user);
1735 }
1736
1737 async fn cache_users_slice(&self, users: &[tl::enums::User]) {
1738 let mut cache = self.inner.peer_cache.lock().await;
1739 cache.cache_users(users);
1740 }
1741
1742 async fn cache_chats_slice(&self, chats: &[tl::enums::Chat]) {
1743 let mut cache = self.inner.peer_cache.lock().await;
1744 cache.cache_chats(chats);
1745 }
1746
1747 #[doc(hidden)]
1749 pub async fn cache_users_slice_pub(&self, users: &[tl::enums::User]) {
1750 self.cache_users_slice(users).await;
1751 }
1752
1753 #[doc(hidden)]
1754 pub async fn cache_chats_slice_pub(&self, chats: &[tl::enums::Chat]) {
1755 self.cache_chats_slice(chats).await;
1756 }
1757
1758 #[doc(hidden)]
1760 pub async fn rpc_call_raw_pub<R: layer_tl_types::RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
1761 self.rpc_call_raw(req).await
1762 }
1763
1764 pub fn iter_dialogs(&self) -> DialogIter {
1781 DialogIter {
1782 offset_date: 0,
1783 offset_id: 0,
1784 offset_peer: tl::enums::InputPeer::Empty,
1785 done: false,
1786 buffer: VecDeque::new(),
1787 }
1788 }
1789
1790 pub fn iter_messages(&self, peer: tl::enums::Peer) -> MessageIter {
1804 MessageIter {
1805 peer,
1806 offset_id: 0,
1807 done: false,
1808 buffer: VecDeque::new(),
1809 }
1810 }
1811
1812 pub async fn resolve_to_input_peer(
1817 &self,
1818 peer: &tl::enums::Peer,
1819 ) -> Result<tl::enums::InputPeer, InvocationError> {
1820 let cache = self.inner.peer_cache.lock().await;
1821 match peer {
1822 tl::enums::Peer::User(u) => {
1823 if u.user_id == 0 {
1824 return Ok(tl::enums::InputPeer::PeerSelf);
1825 }
1826 match cache.users.get(&u.user_id) {
1827 Some(&hash) => Ok(tl::enums::InputPeer::User(tl::types::InputPeerUser {
1828 user_id: u.user_id, access_hash: hash,
1829 })),
1830 None => Err(InvocationError::Deserialize(format!(
1831 "access_hash unknown for user {}; resolve via username first", u.user_id
1832 ))),
1833 }
1834 }
1835 tl::enums::Peer::Chat(c) => {
1836 Ok(tl::enums::InputPeer::Chat(tl::types::InputPeerChat { chat_id: c.chat_id }))
1837 }
1838 tl::enums::Peer::Channel(c) => {
1839 match cache.channels.get(&c.channel_id) {
1840 Some(&hash) => Ok(tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
1841 channel_id: c.channel_id, access_hash: hash,
1842 })),
1843 None => Err(InvocationError::Deserialize(format!(
1844 "access_hash unknown for channel {}; resolve via username first", c.channel_id
1845 ))),
1846 }
1847 }
1848 }
1849 }
1850
1851 pub async fn invoke_on_dc<R: RemoteCall>(
1859 &self,
1860 dc_id: i32,
1861 req: &R,
1862 ) -> Result<R::Return, InvocationError> {
1863 let body = self.rpc_on_dc_raw(dc_id, req).await?;
1864 let mut cur = Cursor::from_slice(&body);
1865 R::Return::deserialize(&mut cur).map_err(Into::into)
1866 }
1867
1868 async fn rpc_on_dc_raw<R: RemoteCall>(
1870 &self,
1871 dc_id: i32,
1872 req: &R,
1873 ) -> Result<Vec<u8>, InvocationError> {
1874 let needs_new = {
1876 let pool = self.inner.dc_pool.lock().await;
1877 !pool.has_connection(dc_id)
1878 };
1879
1880 if needs_new {
1881 let addr = {
1882 let opts = self.inner.dc_options.lock().await;
1883 opts.get(&dc_id).map(|e| e.addr.clone())
1884 .ok_or_else(|| InvocationError::Deserialize(format!("unknown DC{dc_id}")))?
1885 };
1886
1887 let socks5 = self.inner.socks5.clone();
1888 let transport = self.inner.transport.clone();
1889 let saved_key = {
1890 let opts = self.inner.dc_options.lock().await;
1891 opts.get(&dc_id).and_then(|e| e.auth_key)
1892 };
1893
1894 let dc_conn = if let Some(key) = saved_key {
1895 dc_pool::DcConnection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
1896 } else {
1897 let conn = dc_pool::DcConnection::connect_raw(&addr, socks5.as_ref(), &transport).await?;
1898 let home_dc_id = *self.inner.home_dc_id.lock().await;
1900 if dc_id != home_dc_id {
1901 if let Err(e) = self.export_import_auth(dc_id, &conn).await {
1902 log::warn!("[layer] Auth export/import for DC{dc_id} failed: {e}");
1903 }
1904 }
1905 conn
1906 };
1907
1908 let key = dc_conn.auth_key_bytes();
1909 {
1910 let mut opts = self.inner.dc_options.lock().await;
1911 if let Some(e) = opts.get_mut(&dc_id) {
1912 e.auth_key = Some(key);
1913 }
1914 }
1915 self.inner.dc_pool.lock().await.insert(dc_id, dc_conn);
1916 }
1917
1918 let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
1919 self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, req).await
1920 }
1921
1922 async fn export_import_auth(
1924 &self,
1925 dc_id: i32,
1926 _dc_conn: &dc_pool::DcConnection, ) -> Result<(), InvocationError> {
1928 let export_req = tl::functions::auth::ExportAuthorization { dc_id };
1930 let body = self.rpc_call_raw(&export_req).await?;
1931 let mut cur = Cursor::from_slice(&body);
1932 let exported = match tl::enums::auth::ExportedAuthorization::deserialize(&mut cur)? {
1933 tl::enums::auth::ExportedAuthorization::ExportedAuthorization(e) => e,
1934 };
1935
1936 let import_req = tl::functions::auth::ImportAuthorization {
1938 id: exported.id,
1939 bytes: exported.bytes,
1940 };
1941 let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
1942 self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, &import_req).await?;
1943 log::info!("[layer] Auth exported+imported to DC{dc_id} ✓");
1944 Ok(())
1945 }
1946
1947 async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
1950 let body = self.rpc_call_raw(&tl::functions::account::GetPassword {}).await?;
1951 let mut cur = Cursor::from_slice(&body);
1952 let pw = match tl::enums::account::Password::deserialize(&mut cur)? {
1953 tl::enums::account::Password::Password(p) => p,
1954 };
1955 Ok(PasswordToken { password: pw })
1956 }
1957
1958 fn make_send_code_req(&self, phone: &str) -> tl::functions::auth::SendCode {
1959 tl::functions::auth::SendCode {
1960 phone_number: phone.to_string(),
1961 api_id: self.inner.api_id,
1962 api_hash: self.inner.api_hash.clone(),
1963 settings: tl::enums::CodeSettings::CodeSettings(
1964 tl::types::CodeSettings {
1965 allow_flashcall: false, current_number: false, allow_app_hash: false,
1966 allow_missed_call: false, allow_firebase: false, unknown_number: false,
1967 logout_tokens: None, token: None, app_sandbox: None,
1968 },
1969 ),
1970 }
1971 }
1972
1973 fn extract_user_name(user: &tl::enums::User) -> String {
1974 match user {
1975 tl::enums::User::User(u) => {
1976 format!("{} {}",
1977 u.first_name.as_deref().unwrap_or(""),
1978 u.last_name.as_deref().unwrap_or(""))
1979 .trim().to_string()
1980 }
1981 tl::enums::User::Empty(_) => "(unknown)".into(),
1982 }
1983 }
1984
1985 fn extract_password_params(
1986 algo: &tl::enums::PasswordKdfAlgo,
1987 ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
1988 match algo {
1989 tl::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
1990 Ok((&a.salt1, &a.salt2, &a.p, a.g))
1991 }
1992 _ => Err(InvocationError::Deserialize("unsupported password KDF algo".into())),
1993 }
1994 }
1995}
1996
1997pub struct DialogIter {
2001 offset_date: i32,
2002 offset_id: i32,
2003 offset_peer: tl::enums::InputPeer,
2004 done: bool,
2005 buffer: VecDeque<Dialog>,
2006}
2007
2008impl DialogIter {
2009 const PAGE_SIZE: i32 = 100;
2010
2011 pub async fn next(&mut self, client: &Client) -> Result<Option<Dialog>, InvocationError> {
2013 if let Some(d) = self.buffer.pop_front() { return Ok(Some(d)); }
2014 if self.done { return Ok(None); }
2015
2016 let req = tl::functions::messages::GetDialogs {
2017 exclude_pinned: false,
2018 folder_id: None,
2019 offset_date: self.offset_date,
2020 offset_id: self.offset_id,
2021 offset_peer: self.offset_peer.clone(),
2022 limit: Self::PAGE_SIZE,
2023 hash: 0,
2024 };
2025
2026 let dialogs = client.get_dialogs_raw(req).await?;
2027 if dialogs.is_empty() || dialogs.len() < Self::PAGE_SIZE as usize {
2028 self.done = true;
2029 }
2030
2031 if let Some(last) = dialogs.last() {
2033 self.offset_date = last.message.as_ref().map(|m| match m {
2034 tl::enums::Message::Message(x) => x.date,
2035 tl::enums::Message::Service(x) => x.date,
2036 _ => 0,
2037 }).unwrap_or(0);
2038 self.offset_id = last.top_message();
2039 if let Some(peer) = last.peer() {
2040 self.offset_peer = client.inner.peer_cache.lock().await.peer_to_input(peer);
2041 }
2042 }
2043
2044 self.buffer.extend(dialogs);
2045 Ok(self.buffer.pop_front())
2046 }
2047}
2048
2049pub struct MessageIter {
2051 peer: tl::enums::Peer,
2052 offset_id: i32,
2053 done: bool,
2054 buffer: VecDeque<update::IncomingMessage>,
2055}
2056
2057impl MessageIter {
2058 const PAGE_SIZE: i32 = 100;
2059
2060 pub async fn next(&mut self, client: &Client) -> Result<Option<update::IncomingMessage>, InvocationError> {
2062 if let Some(m) = self.buffer.pop_front() { return Ok(Some(m)); }
2063 if self.done { return Ok(None); }
2064
2065 let input_peer = client.inner.peer_cache.lock().await.peer_to_input(&self.peer);
2066 let page = client.get_messages(input_peer, Self::PAGE_SIZE, self.offset_id).await?;
2067
2068 if page.is_empty() || page.len() < Self::PAGE_SIZE as usize {
2069 self.done = true;
2070 }
2071 if let Some(last) = page.last() {
2072 self.offset_id = last.id();
2073 }
2074
2075 self.buffer.extend(page);
2076 Ok(self.buffer.pop_front())
2077 }
2078}
2079
2080#[doc(hidden)]
2084pub fn random_i64_pub() -> i64 { random_i64() }
2085
2086enum FrameKind {
2090 Abridged,
2091 Intermediate,
2092 #[allow(dead_code)]
2093 Full { send_seqno: u32, recv_seqno: u32 },
2094}
2095
2096struct Connection {
2097 stream: TcpStream,
2098 enc: EncryptedSession,
2099 frame_kind: FrameKind,
2100}
2101
2102impl Connection {
2103 async fn open_stream(
2105 addr: &str,
2106 socks5: Option<&crate::socks5::Socks5Config>,
2107 transport: &TransportKind,
2108 ) -> Result<(TcpStream, FrameKind), InvocationError> {
2109 let stream = match socks5 {
2110 Some(proxy) => proxy.connect(addr).await?,
2111 None => TcpStream::connect(addr).await?,
2112 };
2113 Self::apply_transport_init(stream, transport).await
2114 }
2115
2116 async fn apply_transport_init(
2118 mut stream: TcpStream,
2119 transport: &TransportKind,
2120 ) -> Result<(TcpStream, FrameKind), InvocationError> {
2121 match transport {
2122 TransportKind::Abridged => {
2123 stream.write_all(&[0xef]).await?;
2124 Ok((stream, FrameKind::Abridged))
2125 }
2126 TransportKind::Intermediate => {
2127 stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
2128 Ok((stream, FrameKind::Intermediate))
2129 }
2130 TransportKind::Full => {
2131 Ok((stream, FrameKind::Full { send_seqno: 0, recv_seqno: 0 }))
2133 }
2134 TransportKind::Obfuscated { secret } => {
2135 let mut nonce = [0u8; 64];
2144 getrandom::getrandom(&mut nonce).map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
2145 let (enc_key, enc_iv, _dec_key, _dec_iv) = crate::transport_obfuscated::derive_keys(&nonce, secret.as_ref());
2147 let mut enc_cipher = crate::transport_obfuscated::ObfCipher::new(enc_key, enc_iv);
2148 let mut handshake = nonce;
2150 handshake[56] = 0xef; handshake[57] = 0xef;
2151 handshake[58] = 0xef; handshake[59] = 0xef;
2152 enc_cipher.apply(&mut handshake[56..]);
2153 stream.write_all(&handshake).await?;
2154 Ok((stream, FrameKind::Abridged))
2155 }
2156 }
2157 }
2158
2159 async fn connect_raw(
2160 addr: &str,
2161 socks5: Option<&crate::socks5::Socks5Config>,
2162 transport: &TransportKind,
2163 ) -> Result<Self, InvocationError> {
2164 log::info!("[layer] Connecting to {addr} (DH) …");
2165
2166 let addr2 = addr.to_string();
2170 let socks5_c = socks5.cloned();
2171 let transport_c = transport.clone();
2172
2173 let fut = async move {
2174 let (mut stream, frame_kind) =
2175 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
2176
2177 let mut plain = Session::new();
2178
2179 let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2180 send_frame(&mut stream, &plain.pack(&req1).to_plaintext_bytes(), &frame_kind).await?;
2181 let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
2182
2183 let (req2, s2) = auth::step2(s1, res_pq).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2184 send_frame(&mut stream, &plain.pack(&req2).to_plaintext_bytes(), &frame_kind).await?;
2185 let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
2186
2187 let (req3, s3) = auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2188 send_frame(&mut stream, &plain.pack(&req3).to_plaintext_bytes(), &frame_kind).await?;
2189 let ans: tl::enums::SetClientDhParamsAnswer = recv_frame_plain(&mut stream, &frame_kind).await?;
2190
2191 let done = auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2192 log::info!("[layer] DH complete ✓");
2193
2194 Ok::<Self, InvocationError>(Self {
2195 stream,
2196 enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
2197 frame_kind,
2198 })
2199 };
2200
2201 tokio::time::timeout(Duration::from_secs(15), fut)
2202 .await
2203 .map_err(|_| InvocationError::Deserialize(
2204 format!("DH handshake with {addr} timed out after 15 s")
2205 ))?
2206 }
2207
2208 async fn connect_with_key(
2209 addr: &str,
2210 auth_key: [u8; 256],
2211 first_salt: i64,
2212 time_offset: i32,
2213 socks5: Option<&crate::socks5::Socks5Config>,
2214 transport: &TransportKind,
2215 ) -> Result<Self, InvocationError> {
2216 let addr2 = addr.to_string();
2217 let socks5_c = socks5.cloned();
2218 let transport_c = transport.clone();
2219
2220 let fut = async move {
2221 let (stream, frame_kind) =
2222 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
2223 Ok::<Self, InvocationError>(Self {
2224 stream,
2225 enc: EncryptedSession::new(auth_key, first_salt, time_offset),
2226 frame_kind,
2227 })
2228 };
2229
2230 tokio::time::timeout(Duration::from_secs(15), fut)
2231 .await
2232 .map_err(|_| InvocationError::Deserialize(
2233 format!("connect_with_key to {addr} timed out after 15 s")
2234 ))?
2235 }
2236
2237 fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
2238 fn first_salt(&self) -> i64 { self.enc.salt }
2239 fn time_offset(&self) -> i32 { self.enc.time_offset }
2240
2241 async fn rpc_call<R: RemoteCall>(
2242 &mut self,
2243 req: &R,
2244 side_updates: &mut Vec<update::Update>,
2245 ) -> Result<Vec<u8>, InvocationError> {
2246 let wire = self.enc.pack(req);
2247 send_frame(&mut self.stream, &wire, &self.frame_kind).await?;
2248 tokio::time::timeout(Duration::from_secs(10), self.recv_rpc_with_updates(side_updates))
2249 .await
2250 .map_err(|_| InvocationError::Deserialize("rpc_call timed out after 10 s".into()))?
2251 }
2252
2253 async fn rpc_call_serializable<S: tl::Serializable>(
2254 &mut self,
2255 req: &S,
2256 side_updates: &mut Vec<update::Update>,
2257 ) -> Result<Vec<u8>, InvocationError> {
2258 let wire = self.enc.pack_serializable(req);
2259 send_frame(&mut self.stream, &wire, &self.frame_kind).await?;
2260 tokio::time::timeout(Duration::from_secs(10), self.recv_rpc_with_updates(side_updates))
2261 .await
2262 .map_err(|_| InvocationError::Deserialize("rpc_call_serializable timed out after 10 s".into()))?
2263 }
2264
2265 async fn rpc_call_ack<S: tl::Serializable>(
2270 &mut self,
2271 req: &S,
2272 side_updates: &mut Vec<update::Update>,
2273 ) -> Result<(), InvocationError> {
2274 let wire = self.enc.pack_serializable(req);
2275 send_frame(&mut self.stream, &wire, &self.frame_kind).await?;
2276 tokio::time::timeout(Duration::from_secs(10), self.recv_ack(side_updates))
2277 .await
2278 .map_err(|_| InvocationError::Deserialize("rpc_call_ack timed out after 10 s".into()))?
2279 }
2280
2281 async fn recv_ack(
2282 &mut self,
2283 side_updates: &mut Vec<update::Update>,
2284 ) -> Result<(), InvocationError> {
2285 loop {
2286 let mut raw = recv_frame(&mut self.stream, &mut self.frame_kind).await?;
2287 let msg = self.enc.unpack(&mut raw)
2288 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2289 if msg.salt != 0 { self.enc.salt = msg.salt; }
2290 match unwrap_envelope(msg.body)? {
2291 EnvelopeResult::Payload(_) => return Ok(()),
2292 EnvelopeResult::Updates(us) => {
2293 side_updates.extend(us);
2294 return Ok(());
2295 }
2296 EnvelopeResult::None => {}
2297 }
2298 }
2299 }
2300
2301 async fn recv_rpc_with_updates(
2304 &mut self,
2305 side_updates: &mut Vec<update::Update>,
2306 ) -> Result<Vec<u8>, InvocationError> {
2307 loop {
2308 let mut raw = recv_frame(&mut self.stream, &mut self.frame_kind).await?;
2309 let msg = self.enc.unpack(&mut raw)
2310 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2311 if msg.salt != 0 { self.enc.salt = msg.salt; }
2312 match unwrap_envelope(msg.body)? {
2313 EnvelopeResult::Payload(p) => return Ok(p),
2314 EnvelopeResult::Updates(us) => {
2315 log::debug!("[layer] {} updates interleaved during RPC", us.len());
2316 side_updates.extend(us);
2317 }
2318 EnvelopeResult::None => {}
2319 }
2320 }
2321 }
2322
2323 #[allow(dead_code)]
2325 async fn recv_rpc(&mut self) -> Result<Vec<u8>, InvocationError> {
2326 let mut _discard = Vec::new();
2327 self.recv_rpc_with_updates(&mut _discard).await
2328 }
2329
2330 async fn recv_once(&mut self) -> Result<Vec<update::Update>, InvocationError> {
2331 let mut raw = recv_frame(&mut self.stream, &mut self.frame_kind).await?;
2332 let msg = self.enc.unpack(&mut raw)
2333 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2334 if msg.salt != 0 { self.enc.salt = msg.salt; }
2335 match unwrap_envelope(msg.body)? {
2336 EnvelopeResult::Updates(us) => Ok(us),
2337 _ => Ok(vec![]),
2338 }
2339 }
2340
2341 async fn send_ping(&mut self) -> Result<(), InvocationError> {
2342 let req = tl::functions::Ping { ping_id: random_i64() };
2343 let wire = self.enc.pack(&req);
2344 send_frame(&mut self.stream, &wire, &self.frame_kind).await?;
2345 Ok(())
2346 }
2347}
2348
2349async fn send_frame(
2353 stream: &mut TcpStream,
2354 data: &[u8],
2355 kind: &FrameKind,
2356) -> Result<(), InvocationError> {
2357 match kind {
2358 FrameKind::Abridged => send_abridged(stream, data).await,
2359 FrameKind::Intermediate => {
2360 stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
2361 stream.write_all(data).await?;
2362 Ok(())
2363 }
2364 FrameKind::Full { .. } => {
2365 stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
2371 stream.write_all(data).await?;
2372 Ok(())
2373 }
2374 }
2375}
2376
2377async fn recv_frame(
2379 stream: &mut TcpStream,
2380 kind: &mut FrameKind,
2381) -> Result<Vec<u8>, InvocationError> {
2382 match kind {
2383 FrameKind::Abridged => recv_abridged(stream).await,
2384 FrameKind::Intermediate | FrameKind::Full { .. } => {
2385 let mut len_buf = [0u8; 4];
2386 stream.read_exact(&mut len_buf).await?;
2387 let len = u32::from_le_bytes(len_buf) as usize;
2388 let mut buf = vec![0u8; len];
2389 stream.read_exact(&mut buf).await?;
2390 Ok(buf)
2391 }
2392 }
2393}
2394
2395async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
2397 let words = data.len() / 4;
2398 if words < 0x7f {
2399 stream.write_all(&[words as u8]).await?;
2400 } else {
2401 let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
2402 stream.write_all(&b).await?;
2403 }
2404 stream.write_all(data).await?;
2405 Ok(())
2406}
2407
2408async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
2409 let mut h = [0u8; 1];
2410 stream.read_exact(&mut h).await?;
2411 let words = if h[0] < 0x7f {
2412 h[0] as usize
2413 } else {
2414 let mut b = [0u8; 3];
2415 stream.read_exact(&mut b).await?;
2416 let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
2417 if w == 1 {
2419 let mut code_buf = [0u8; 4];
2420 stream.read_exact(&mut code_buf).await?;
2421 let code = i32::from_le_bytes(code_buf);
2422 return Err(InvocationError::Rpc(RpcError::from_telegram(code, "transport error")));
2423 }
2424 w
2425 };
2426 if words == 0 || words > 0x8000 {
2429 return Err(InvocationError::Deserialize(
2430 format!("abridged: implausible word count {words} (possible transport error or framing mismatch)")
2431 ));
2432 }
2433 let mut buf = vec![0u8; words * 4];
2434 stream.read_exact(&mut buf).await?;
2435 Ok(buf)
2436}
2437
2438async fn recv_frame_plain<T: Deserializable>(
2440 stream: &mut TcpStream,
2441 _kind: &FrameKind,
2442) -> Result<T, InvocationError> {
2443 let raw = recv_abridged(stream).await?; if raw.len() < 20 {
2445 return Err(InvocationError::Deserialize("plaintext frame too short".into()));
2446 }
2447 if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
2448 return Err(InvocationError::Deserialize("expected auth_key_id=0 in plaintext".into()));
2449 }
2450 let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
2451 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
2452 T::deserialize(&mut cur).map_err(Into::into)
2453}
2454
2455enum EnvelopeResult {
2458 Payload(Vec<u8>),
2459 Updates(Vec<update::Update>),
2460 None,
2461}
2462
2463fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
2464 if body.len() < 4 {
2465 return Err(InvocationError::Deserialize("body < 4 bytes".into()));
2466 }
2467 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
2468
2469 match cid {
2470 ID_RPC_RESULT => {
2471 if body.len() < 12 {
2472 return Err(InvocationError::Deserialize("rpc_result too short".into()));
2473 }
2474 unwrap_envelope(body[12..].to_vec())
2475 }
2476 ID_RPC_ERROR => {
2477 if body.len() < 8 {
2478 return Err(InvocationError::Deserialize("rpc_error too short".into()));
2479 }
2480 let code = i32::from_le_bytes(body[4..8].try_into().unwrap());
2481 let message = tl_read_string(&body[8..]).unwrap_or_default();
2482 Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
2483 }
2484 ID_MSG_CONTAINER => {
2485 if body.len() < 8 {
2486 return Err(InvocationError::Deserialize("container too short".into()));
2487 }
2488 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
2489 let mut pos = 8usize;
2490 let mut payload: Option<Vec<u8>> = None;
2491 let mut updates_buf: Vec<update::Update> = Vec::new();
2492
2493 for _ in 0..count {
2494 if pos + 16 > body.len() { break; }
2495 let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
2496 pos += 16;
2497 if pos + inner_len > body.len() { break; }
2498 let inner = body[pos..pos + inner_len].to_vec();
2499 pos += inner_len;
2500 match unwrap_envelope(inner)? {
2501 EnvelopeResult::Payload(p) => { payload = Some(p); }
2502 EnvelopeResult::Updates(us) => { updates_buf.extend(us); }
2503 EnvelopeResult::None => {}
2504 }
2505 }
2506 if let Some(p) = payload {
2507 Ok(EnvelopeResult::Payload(p))
2508 } else if !updates_buf.is_empty() {
2509 Ok(EnvelopeResult::Updates(updates_buf))
2510 } else {
2511 Ok(EnvelopeResult::None)
2512 }
2513 }
2514 ID_GZIP_PACKED => {
2515 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
2516 unwrap_envelope(gz_inflate(&bytes)?)
2517 }
2518 ID_PONG | ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY
2520 | 0xd33b5459 | 0x04deb57d | 0x8cc0d131 | 0x276d3ec6 | 0x809db6df | 0x7d861a08 | 0x0949d9dc | 0xae500895 | 0x9299359f | 0xe22045fc | 0x62d350c9 => {
2533 Ok(EnvelopeResult::None)
2534 }
2535 ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
2536 | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
2537 | ID_UPDATES_TOO_LONG => {
2538 Ok(EnvelopeResult::Updates(update::parse_updates(&body)))
2539 }
2540 _ => Ok(EnvelopeResult::Payload(body)),
2541 }
2542}
2543
2544fn random_i64() -> i64 {
2547 let mut b = [0u8; 8];
2548 getrandom::getrandom(&mut b).expect("getrandom");
2549 i64::from_le_bytes(b)
2550}
2551
2552fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
2553 if data.is_empty() { return Some(vec![]); }
2554 let (len, start) = if data[0] < 254 { (data[0] as usize, 1) }
2555 else if data.len() >= 4 {
2556 (data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16, 4)
2557 } else { return None; };
2558 if data.len() < start + len { return None; }
2559 Some(data[start..start + len].to_vec())
2560}
2561
2562fn tl_read_string(data: &[u8]) -> Option<String> {
2563 tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
2564}
2565
2566fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
2567 use std::io::Read;
2568 let mut out = Vec::new();
2569 if flate2::read::GzDecoder::new(data).read_to_end(&mut out).is_ok() && !out.is_empty() {
2570 return Ok(out);
2571 }
2572 out.clear();
2573 flate2::read::ZlibDecoder::new(data)
2574 .read_to_end(&mut out)
2575 .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
2576 Ok(out)
2577}