1#![deny(unsafe_code)]
21
22mod errors;
23mod retry;
24mod session;
25mod transport;
26mod two_factor_auth;
27pub mod update;
28pub mod parsers;
29pub mod media;
30pub mod participants;
31pub mod pts;
32
33pub mod dc_pool;
35pub mod transport_obfuscated;
36pub mod transport_intermediate;
37pub mod socks5;
38pub mod session_backend;
39pub mod inline_iter;
40pub mod typing_guard;
41
42pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
43pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
44pub use update::Update;
45pub use media::{UploadedFile, DownloadIter};
46pub use participants::Participant;
47pub use typing_guard::TypingGuard;
48pub use socks5::Socks5Config;
49pub use session_backend::{SessionBackend, BinaryFileBackend, InMemoryBackend};
50
51use std::collections::HashMap;
52use std::collections::VecDeque;
53use std::num::NonZeroU32;
54use std::ops::ControlFlow;
55use std::sync::Arc;
56use std::time::Duration;
57
58use layer_tl_types as tl;
59use layer_mtproto::{EncryptedSession, Session, authentication as auth};
60use layer_tl_types::{Cursor, Deserializable, RemoteCall};
61use session::{DcEntry, PersistedSession};
62use tokio::io::{AsyncReadExt, AsyncWriteExt};
63use tokio::net::TcpStream;
64use tokio::sync::{mpsc, oneshot, Mutex};
65use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
66use tokio::time::sleep;
67
68const ID_RPC_RESULT: u32 = 0xf35c6d01;
71const ID_RPC_ERROR: u32 = 0x2144ca19;
72const ID_MSG_CONTAINER: u32 = 0x73f1f8dc;
73const ID_GZIP_PACKED: u32 = 0x3072cfa1;
74const ID_PONG: u32 = 0x347773c5;
75const ID_MSGS_ACK: u32 = 0x62d6b459;
76const ID_BAD_SERVER_SALT: u32 = 0xedab447b;
77const ID_NEW_SESSION: u32 = 0x9ec20908;
78const ID_BAD_MSG_NOTIFY: u32 = 0xa7eff811;
79const ID_UPDATES: u32 = 0x74ae4240;
80const ID_UPDATE_SHORT: u32 = 0x78d4dec1;
81const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
82const ID_UPDATE_SHORT_MSG: u32 = 0x313bc7f8;
83const ID_UPDATE_SHORT_CHAT_MSG: u32 = 0x4d6deea5;
84const ID_UPDATES_TOO_LONG: u32 = 0xe317af7e;
85
86#[derive(Default)]
91pub(crate) struct PeerCache {
92 pub(crate) users: HashMap<i64, i64>,
94 pub(crate) channels: HashMap<i64, i64>,
96}
97
98impl PeerCache {
99 fn cache_user(&mut self, user: &tl::enums::User) {
100 if let tl::enums::User::User(u) = user {
101 if let Some(hash) = u.access_hash {
102 self.users.insert(u.id, hash);
103 }
104 }
105 }
106
107 fn cache_chat(&mut self, chat: &tl::enums::Chat) {
108 match chat {
109 tl::enums::Chat::Channel(c) => {
110 if let Some(hash) = c.access_hash {
111 self.channels.insert(c.id, hash);
112 }
113 }
114 tl::enums::Chat::ChannelForbidden(c) => {
115 self.channels.insert(c.id, c.access_hash);
116 }
117 _ => {}
118 }
119 }
120
121 fn cache_users(&mut self, users: &[tl::enums::User]) {
122 for u in users { self.cache_user(u); }
123 }
124
125 fn cache_chats(&mut self, chats: &[tl::enums::Chat]) {
126 for c in chats { self.cache_chat(c); }
127 }
128
129 fn user_input_peer(&self, user_id: i64) -> tl::enums::InputPeer {
130 if user_id == 0 {
131 return tl::enums::InputPeer::PeerSelf;
132 }
133 let hash = self.users.get(&user_id).copied().unwrap_or(0);
134 tl::enums::InputPeer::User(tl::types::InputPeerUser { user_id, access_hash: hash })
135 }
136
137 fn channel_input_peer(&self, channel_id: i64) -> tl::enums::InputPeer {
138 let hash = self.channels.get(&channel_id).copied().unwrap_or(0);
139 tl::enums::InputPeer::Channel(tl::types::InputPeerChannel { channel_id, access_hash: hash })
140 }
141
142 fn peer_to_input(&self, peer: &tl::enums::Peer) -> tl::enums::InputPeer {
143 match peer {
144 tl::enums::Peer::User(u) => self.user_input_peer(u.user_id),
145 tl::enums::Peer::Chat(c) => tl::enums::InputPeer::Chat(
146 tl::types::InputPeerChat { chat_id: c.chat_id }
147 ),
148 tl::enums::Peer::Channel(c) => self.channel_input_peer(c.channel_id),
149 }
150 }
151}
152
153#[derive(Clone, Default)]
165pub struct InputMessage {
166 pub text: String,
167 pub reply_to: Option<i32>,
168 pub silent: bool,
169 pub background: bool,
170 pub clear_draft: bool,
171 pub no_webpage: bool,
172 pub entities: Option<Vec<tl::enums::MessageEntity>>,
173 pub reply_markup: Option<tl::enums::ReplyMarkup>,
174 pub schedule_date: Option<i32>,
175}
176
177impl InputMessage {
178 pub fn text(text: impl Into<String>) -> Self {
180 Self { text: text.into(), ..Default::default() }
181 }
182
183 pub fn set_text(mut self, text: impl Into<String>) -> Self {
185 self.text = text.into(); self
186 }
187
188 pub fn reply_to(mut self, id: Option<i32>) -> Self {
190 self.reply_to = id; self
191 }
192
193 pub fn silent(mut self, v: bool) -> Self {
195 self.silent = v; self
196 }
197
198 pub fn background(mut self, v: bool) -> Self {
200 self.background = v; self
201 }
202
203 pub fn clear_draft(mut self, v: bool) -> Self {
205 self.clear_draft = v; self
206 }
207
208 pub fn no_webpage(mut self, v: bool) -> Self {
210 self.no_webpage = v; self
211 }
212
213 pub fn entities(mut self, e: Vec<tl::enums::MessageEntity>) -> Self {
215 self.entities = Some(e); self
216 }
217
218 pub fn reply_markup(mut self, rm: tl::enums::ReplyMarkup) -> Self {
220 self.reply_markup = Some(rm); self
221 }
222
223 pub fn schedule_date(mut self, ts: Option<i32>) -> Self {
225 self.schedule_date = ts; self
226 }
227
228 fn reply_header(&self) -> Option<tl::enums::InputReplyTo> {
229 self.reply_to.map(|id| {
230 tl::enums::InputReplyTo::Message(
231 tl::types::InputReplyToMessage {
232 reply_to_msg_id: id,
233 top_msg_id: None,
234 reply_to_peer_id: None,
235 quote_text: None,
236 quote_entities: None,
237 quote_offset: None,
238 monoforum_peer_id: None,
239 todo_item_id: None,
240 }
241 )
242 })
243 }
244}
245
246impl From<&str> for InputMessage {
247 fn from(s: &str) -> Self { Self::text(s) }
248}
249
250impl From<String> for InputMessage {
251 fn from(s: String) -> Self { Self::text(s) }
252}
253
254#[derive(Clone, Debug, Default)]
265pub enum TransportKind {
266 #[default]
270 Abridged,
271 Intermediate,
275 Full,
279 Obfuscated { secret: Option<[u8; 16]> },
286}
287
288#[derive(Clone)]
292pub struct Config {
293 pub api_id: i32,
294 pub api_hash: String,
295 pub dc_addr: Option<String>,
296 pub retry_policy: Arc<dyn RetryPolicy>,
297 pub socks5: Option<crate::socks5::Socks5Config>,
299 pub allow_ipv6: bool,
301 pub transport: TransportKind,
303 pub session_backend: Arc<dyn crate::session_backend::SessionBackend>,
305}
306
307impl Default for Config {
308 fn default() -> Self {
309 Self {
310 api_id: 0,
311 api_hash: String::new(),
312 dc_addr: None,
313 retry_policy: Arc::new(AutoSleep::default()),
314 socks5: None,
315 allow_ipv6: false,
316 transport: TransportKind::Abridged,
317 session_backend: Arc::new(crate::session_backend::BinaryFileBackend::new("layer.session")),
318 }
319 }
320}
321
322pub struct UpdateStream {
326 rx: mpsc::UnboundedReceiver<update::Update>,
327}
328
329impl UpdateStream {
330 pub async fn next(&mut self) -> Option<update::Update> {
332 self.rx.recv().await
333 }
334}
335
336#[derive(Debug, Clone)]
340pub struct Dialog {
341 pub raw: tl::enums::Dialog,
342 pub message: Option<tl::enums::Message>,
343 pub entity: Option<tl::enums::User>,
344 pub chat: Option<tl::enums::Chat>,
345}
346
347impl Dialog {
348 pub fn title(&self) -> String {
350 if let Some(tl::enums::User::User(u)) = &self.entity {
351 let first = u.first_name.as_deref().unwrap_or("");
352 let last = u.last_name.as_deref().unwrap_or("");
353 let name = format!("{first} {last}").trim().to_string();
354 if !name.is_empty() { return name; }
355 }
356 if let Some(chat) = &self.chat {
357 return match chat {
358 tl::enums::Chat::Chat(c) => c.title.clone(),
359 tl::enums::Chat::Forbidden(c) => c.title.clone(),
360 tl::enums::Chat::Channel(c) => c.title.clone(),
361 tl::enums::Chat::ChannelForbidden(c) => c.title.clone(),
362 tl::enums::Chat::Empty(_) => "(empty)".into(),
363 };
364 }
365 "(Unknown)".to_string()
366 }
367
368 pub fn peer(&self) -> Option<&tl::enums::Peer> {
370 match &self.raw {
371 tl::enums::Dialog::Dialog(d) => Some(&d.peer),
372 tl::enums::Dialog::Folder(_) => None,
373 }
374 }
375
376 pub fn unread_count(&self) -> i32 {
378 match &self.raw {
379 tl::enums::Dialog::Dialog(d) => d.unread_count,
380 _ => 0,
381 }
382 }
383
384 pub fn top_message(&self) -> i32 {
386 match &self.raw {
387 tl::enums::Dialog::Dialog(d) => d.top_message,
388 _ => 0,
389 }
390 }
391}
392
393struct ClientInner {
396 writer: Mutex<ConnectionWriter>,
400 pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>>,
404 reconnect_tx: mpsc::UnboundedSender<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
407 home_dc_id: Mutex<i32>,
408 dc_options: Mutex<HashMap<i32, DcEntry>>,
409 pub(crate) peer_cache: Mutex<PeerCache>,
410 pub(crate) pts_state: Mutex<pts::PtsState>,
411 api_id: i32,
412 api_hash: String,
413 retry_policy: Arc<dyn RetryPolicy>,
414 socks5: Option<crate::socks5::Socks5Config>,
415 allow_ipv6: bool,
416 transport: TransportKind,
417 session_backend: Arc<dyn crate::session_backend::SessionBackend>,
418 dc_pool: Mutex<dc_pool::DcPool>,
419 update_tx: mpsc::UnboundedSender<update::Update>,
420}
421
422#[derive(Clone)]
424pub struct Client {
425 pub(crate) inner: Arc<ClientInner>,
426 _update_rx: Arc<Mutex<mpsc::UnboundedReceiver<update::Update>>>,
427}
428
429impl Client {
430 pub async fn connect(config: Config) -> Result<Self, InvocationError> {
433 let (update_tx, update_rx) = mpsc::unbounded_channel();
434
435 let socks5 = config.socks5.clone();
437 let transport = config.transport.clone();
438
439 let (conn, home_dc_id, dc_opts) =
440 match config.session_backend.load()
441 .map_err(InvocationError::Io)?
442 {
443 Some(s) => {
444 if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
445 if let Some(key) = dc.auth_key {
446 log::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
447 match Connection::connect_with_key(
448 &dc.addr, key, dc.first_salt, dc.time_offset,
449 socks5.as_ref(), &transport,
450 ).await {
451 Ok(c) => {
452 let mut opts = session::default_dc_addresses()
453 .into_iter()
454 .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
455 .collect::<HashMap<_, _>>();
456 for d in &s.dcs { opts.insert(d.dc_id, d.clone()); }
457 (c, s.home_dc_id, opts)
458 }
459 Err(e) => {
460 log::warn!("[layer] Session connect failed ({e}), fresh connect …");
461 Self::fresh_connect(socks5.as_ref(), &transport).await?
462 }
463 }
464 } else {
465 Self::fresh_connect(socks5.as_ref(), &transport).await?
466 }
467 } else {
468 Self::fresh_connect(socks5.as_ref(), &transport).await?
469 }
470 }
471 None => Self::fresh_connect(socks5.as_ref(), &transport).await?,
472 };
473
474 let pool = dc_pool::DcPool::new(home_dc_id, &dc_opts.values().cloned().collect::<Vec<_>>());
476
477 let (writer, read_half, frame_kind) = conn.into_writer();
482 let auth_key = writer.enc.auth_key_bytes();
483 let session_id = writer.enc.session_id();
484
485 let pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Vec<u8>, InvocationError>>>>> =
486 Arc::new(Mutex::new(HashMap::new()));
487
488 let (reconnect_tx, reconnect_rx) =
490 mpsc::unbounded_channel::<(OwnedReadHalf, FrameKind, [u8; 256], i64)>();
491
492 let inner = Arc::new(ClientInner {
493 writer: Mutex::new(writer),
494 pending: pending.clone(),
495 reconnect_tx,
496 home_dc_id: Mutex::new(home_dc_id),
497 dc_options: Mutex::new(dc_opts),
498 peer_cache: Mutex::new(PeerCache::default()),
499 pts_state: Mutex::new(pts::PtsState::default()),
500 api_id: config.api_id,
501 api_hash: config.api_hash,
502 retry_policy: config.retry_policy,
503 socks5: config.socks5,
504 allow_ipv6: config.allow_ipv6,
505 transport: config.transport,
506 session_backend: config.session_backend,
507 dc_pool: Mutex::new(pool),
508 update_tx: update_tx,
509 });
510
511 let client = Self {
512 inner,
513 _update_rx: Arc::new(Mutex::new(update_rx)),
514 };
515
516 {
519 let client_r = client.clone();
520 tokio::spawn(async move {
521 client_r.run_reader_task(
522 read_half, frame_kind, auth_key, session_id, reconnect_rx,
523 ).await;
524 });
525 }
526
527 if let Err(e) = client.init_connection().await {
530 log::warn!("[layer] init_connection failed ({e}), retrying with fresh connect …");
531
532 let socks5_r = client.inner.socks5.clone();
533 let transport_r = client.inner.transport.clone();
534 let (new_conn, new_dc_id, new_opts) =
535 Self::fresh_connect(socks5_r.as_ref(), &transport_r).await?;
536
537 {
538 let mut dc_guard = client.inner.home_dc_id.lock().await;
539 *dc_guard = new_dc_id;
540 }
541 {
542 let mut opts_guard = client.inner.dc_options.lock().await;
543 *opts_guard = new_opts;
544 }
545
546 let (new_writer, new_read, new_fk) = new_conn.into_writer();
548 let new_ak = new_writer.enc.auth_key_bytes();
549 let new_sid = new_writer.enc.session_id();
550 *client.inner.writer.lock().await = new_writer;
551 let _ = client.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
552
553 client.init_connection().await?;
554 }
555
556 let _ = client.sync_pts_state().await;
557 Ok(client)
558 }
559
560 async fn fresh_connect(
561 socks5: Option<&crate::socks5::Socks5Config>,
562 transport: &TransportKind,
563 ) -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
564 log::info!("[layer] Fresh connect to DC2 …");
565 let conn = Connection::connect_raw("149.154.167.51:443", socks5, transport).await?;
566 let opts = session::default_dc_addresses()
567 .into_iter()
568 .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
569 .collect();
570 Ok((conn, 2, opts))
571 }
572
573 pub async fn save_session(&self) -> Result<(), InvocationError> {
576 let writer_guard = self.inner.writer.lock().await;
577 let home_dc_id = *self.inner.home_dc_id.lock().await;
578 let dc_options = self.inner.dc_options.lock().await;
579
580 let mut dcs: Vec<DcEntry> = dc_options.values().map(|e| DcEntry {
581 dc_id: e.dc_id,
582 addr: e.addr.clone(),
583 auth_key: if e.dc_id == home_dc_id { Some(writer_guard.auth_key_bytes()) } else { e.auth_key },
584 first_salt: if e.dc_id == home_dc_id { writer_guard.first_salt() } else { e.first_salt },
585 time_offset: if e.dc_id == home_dc_id { writer_guard.time_offset() } else { e.time_offset },
586 }).collect();
587 self.inner.dc_pool.lock().await.collect_keys(&mut dcs);
589
590 self.inner.session_backend
591 .save(&PersistedSession { home_dc_id, dcs })
592 .map_err(InvocationError::Io)?;
593 log::info!("[layer] Session saved ✓");
594 Ok(())
595 }
596
597 pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
601 match self.invoke(&tl::functions::updates::GetState {}).await {
602 Ok(_) => Ok(true),
603 Err(e) if e.is("AUTH_KEY_UNREGISTERED")
604 || matches!(&e, InvocationError::Rpc(r) if r.code == 401) => Ok(false),
605 Err(e) => Err(e),
606 }
607 }
608
609 pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
611 let req = tl::functions::auth::ImportBotAuthorization {
612 flags: 0, api_id: self.inner.api_id,
613 api_hash: self.inner.api_hash.clone(),
614 bot_auth_token: token.to_string(),
615 };
616
617 let result = match self.invoke(&req).await {
618 Ok(x) => x,
619 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
620 let dc_id = r.value.unwrap_or(2) as i32;
621 self.migrate_to(dc_id).await?;
622 self.invoke(&req).await?
623 }
624 Err(e) => return Err(e),
625 };
626
627 let name = match result {
628 tl::enums::auth::Authorization::Authorization(a) => {
629 self.cache_user(&a.user).await;
630 Self::extract_user_name(&a.user)
631 }
632 tl::enums::auth::Authorization::SignUpRequired(_) => {
633 panic!("unexpected SignUpRequired during bot sign-in")
634 }
635 };
636 log::info!("[layer] Bot signed in ✓ ({name})");
637 Ok(name)
638 }
639
640 pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
642 use tl::enums::auth::SentCode;
643
644 let req = self.make_send_code_req(phone);
645 let body = match self.rpc_call_raw(&req).await {
646 Ok(b) => b,
647 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
648 let dc_id = r.value.unwrap_or(2) as i32;
649 self.migrate_to(dc_id).await?;
650 self.rpc_call_raw(&req).await?
651 }
652 Err(e) => return Err(e),
653 };
654
655 let mut cur = Cursor::from_slice(&body);
656 let hash = match tl::enums::auth::SentCode::deserialize(&mut cur)? {
657 SentCode::SentCode(s) => s.phone_code_hash,
658 SentCode::Success(_) => return Err(InvocationError::Deserialize("unexpected Success".into())),
659 SentCode::PaymentRequired(_) => return Err(InvocationError::Deserialize("payment required to send code".into())),
660 };
661 log::info!("[layer] Login code sent");
662 Ok(LoginToken { phone: phone.to_string(), phone_code_hash: hash })
663 }
664
665 pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
667 let req = tl::functions::auth::SignIn {
668 phone_number: token.phone.clone(),
669 phone_code_hash: token.phone_code_hash.clone(),
670 phone_code: Some(code.trim().to_string()),
671 email_verification: None,
672 };
673
674 let body = match self.rpc_call_raw(&req).await {
675 Ok(b) => b,
676 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
677 let dc_id = r.value.unwrap_or(2) as i32;
678 self.migrate_to(dc_id).await.map_err(SignInError::Other)?;
679 self.rpc_call_raw(&req).await.map_err(SignInError::Other)?
680 }
681 Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
682 let t = self.get_password_info().await.map_err(SignInError::Other)?;
683 return Err(SignInError::PasswordRequired(t));
684 }
685 Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
686 Err(e) => return Err(SignInError::Other(e)),
687 };
688
689 let mut cur = Cursor::from_slice(&body);
690 match tl::enums::auth::Authorization::deserialize(&mut cur)
691 .map_err(|e| SignInError::Other(e.into()))?
692 {
693 tl::enums::auth::Authorization::Authorization(a) => {
694 self.cache_user(&a.user).await;
695 let name = Self::extract_user_name(&a.user);
696 log::info!("[layer] Signed in ✓ Welcome, {name}!");
697 Ok(name)
698 }
699 tl::enums::auth::Authorization::SignUpRequired(_) => Err(SignInError::SignUpRequired),
700 }
701 }
702
703 pub async fn check_password(
705 &self,
706 token: PasswordToken,
707 password: impl AsRef<[u8]>,
708 ) -> Result<String, InvocationError> {
709 let pw = token.password;
710 let algo = pw.current_algo.ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
711 let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
712 let g_b = pw.srp_b.ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
713 let a = pw.secure_random;
714 let srp_id = pw.srp_id.ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
715
716 let (m1, g_a) = two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
717 let req = tl::functions::auth::CheckPassword {
718 password: tl::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
719 tl::types::InputCheckPasswordSrp {
720 srp_id, a: g_a.to_vec(), m1: m1.to_vec(),
721 },
722 ),
723 };
724
725 let body = self.rpc_call_raw(&req).await?;
726 let mut cur = Cursor::from_slice(&body);
727 match tl::enums::auth::Authorization::deserialize(&mut cur)? {
728 tl::enums::auth::Authorization::Authorization(a) => {
729 self.cache_user(&a.user).await;
730 let name = Self::extract_user_name(&a.user);
731 log::info!("[layer] 2FA ✓ Welcome, {name}!");
732 Ok(name)
733 }
734 tl::enums::auth::Authorization::SignUpRequired(_) =>
735 Err(InvocationError::Deserialize("unexpected SignUpRequired after 2FA".into())),
736 }
737 }
738
739 pub async fn sign_out(&self) -> Result<bool, InvocationError> {
741 let req = tl::functions::auth::LogOut {};
742 match self.rpc_call_raw(&req).await {
743 Ok(_) => { log::info!("[layer] Signed out ✓"); Ok(true) }
744 Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
745 Err(e) => Err(e),
746 }
747 }
748
749 pub async fn get_me(&self) -> Result<tl::types::User, InvocationError> {
753 let req = tl::functions::users::GetUsers {
754 id: vec![tl::enums::InputUser::UserSelf],
755 };
756 let body = self.rpc_call_raw(&req).await?;
757 let mut cur = Cursor::from_slice(&body);
758 let users = Vec::<tl::enums::User>::deserialize(&mut cur)?;
759 self.cache_users_slice(&users).await;
760 users.into_iter().find_map(|u| match u {
761 tl::enums::User::User(u) => Some(u),
762 _ => None,
763 }).ok_or_else(|| InvocationError::Deserialize("getUsers returned no user".into()))
764 }
765
766 pub fn stream_updates(&self) -> UpdateStream {
774 let (caller_tx, rx) = mpsc::unbounded_channel::<update::Update>();
775 let internal_rx = self._update_rx.clone();
778 tokio::spawn(async move {
779 let mut guard = internal_rx.lock().await;
781 while let Some(upd) = guard.recv().await {
782 if caller_tx.send(upd).is_err() { break; }
783 }
784 });
785 UpdateStream { rx }
786 }
787
788 async fn run_reader_task(
801 &self,
802 read_half: OwnedReadHalf,
803 frame_kind: FrameKind,
804 auth_key: [u8; 256],
805 session_id: i64,
806 mut new_conn_rx: mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
807 ) {
808 self.reader_loop(read_half, frame_kind, auth_key, session_id, &mut new_conn_rx).await;
809 }
810
811 async fn reader_loop(
812 &self,
813 mut rh: OwnedReadHalf,
814 mut fk: FrameKind,
815 mut ak: [u8; 256],
816 mut sid: i64,
817 new_conn_rx: &mut mpsc::UnboundedReceiver<(OwnedReadHalf, FrameKind, [u8; 256], i64)>,
818 ) {
819 loop {
820 tokio::select! {
824 outcome = recv_frame_with_keepalive(&mut rh, &fk, self, &ak) => {
826 match outcome {
827 FrameOutcome::Frame(mut raw) => {
828 let msg = match EncryptedSession::decrypt_frame(&ak, sid, &mut raw) {
829 Ok(m) => m,
830 Err(e) => { log::warn!("[layer] Decrypt error: {e:?}"); continue; }
831 };
832 if msg.salt != 0 {
833 self.inner.writer.lock().await.enc.salt = msg.salt;
834 }
835 self.route_frame(msg.body).await;
836 }
837 FrameOutcome::Error(e) => {
838 log::warn!("[layer] Reader: error: {e} — reconnecting …");
839 self.inner.pending.lock().await.clear();
840 sleep(Duration::from_secs(1)).await;
841 match self.do_reconnect(&ak, &fk).await {
842 Ok((new_rh, new_fk, new_ak, new_sid)) => {
843 rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
844 let c = self.clone();
845 let utx = self.inner.update_tx.clone();
846 tokio::spawn(async move {
847 if let Ok(missed) = c.get_difference().await {
848 for u in missed { let _ = utx.send(u); }
849 }
850 });
851 }
852 Err(e2) => { log::error!("[layer] Reconnect failed: {e2}"); break; }
853 }
854 }
855 FrameOutcome::Keepalive => {} }
857 }
858
859 maybe = new_conn_rx.recv() => {
861 if let Some((new_rh, new_fk, new_ak, new_sid)) = maybe {
862 rh = new_rh; fk = new_fk; ak = new_ak; sid = new_sid;
864 log::info!("[layer] Reader: switched to new connection");
865 } else {
866 break; }
868 }
869 }
870 }
871 }
872
873 async fn route_frame(&self, body: Vec<u8>) {
875 if body.len() < 4 { return; }
876 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
877
878 match cid {
879 ID_RPC_RESULT => {
880 if body.len() < 12 { return; }
882 let req_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
883 let inner = body[12..].to_vec();
884 let result = unwrap_envelope(inner);
886 if let Some(tx) = self.inner.pending.lock().await.remove(&req_msg_id) {
887 let to_send = match result {
888 Ok(EnvelopeResult::Payload(p)) => Ok(p),
889 Ok(EnvelopeResult::Updates(us)) => {
890 for u in us { let _ = self.inner.update_tx.send(u); }
892 Ok(vec![])
893 }
894 Ok(EnvelopeResult::None) => Ok(vec![]),
895 Err(e) => Err(e),
896 };
897 let _ = tx.send(to_send);
898 }
899 }
900 ID_RPC_ERROR => {
904 log::warn!("[layer] Unexpected top-level rpc_error (no pending target)");
905 }
906 ID_MSG_CONTAINER => {
907 if body.len() < 8 { return; }
909 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
910 let mut pos = 8usize;
911 for _ in 0..count {
912 if pos + 16 > body.len() { break; }
913 let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
914 pos += 16;
915 if pos + inner_len > body.len() { break; }
916 let inner = body[pos..pos + inner_len].to_vec();
917 pos += inner_len;
918 Box::pin(self.route_frame(inner)).await;
919 }
920 }
921 ID_GZIP_PACKED => {
922 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
923 if let Ok(inflated) = gz_inflate(&bytes) {
924 Box::pin(self.route_frame(inflated)).await;
925 }
926 }
927 ID_BAD_SERVER_SALT => {
928 if body.len() >= 16 {
930 let new_salt = i64::from_le_bytes(body[8..16].try_into().unwrap());
931 self.inner.writer.lock().await.enc.salt = new_salt;
932 }
933 }
934 ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
935 | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
936 | ID_UPDATES_TOO_LONG => {
937 for u in update::parse_updates(&body) {
938 let _ = self.inner.update_tx.send(u);
939 }
940 }
941 _ => {}
943 }
944 }
945
946 async fn do_reconnect(
948 &self,
949 _old_auth_key: &[u8; 256],
950 _old_frame_kind: &FrameKind,
951 ) -> Result<(OwnedReadHalf, FrameKind, [u8; 256], i64), InvocationError> {
952 let home_dc_id = *self.inner.home_dc_id.lock().await;
953 let (addr, saved_key, first_salt, time_offset) = {
954 let opts = self.inner.dc_options.lock().await;
955 match opts.get(&home_dc_id) {
956 Some(e) => (e.addr.clone(), e.auth_key, e.first_salt, e.time_offset),
957 None => ("149.154.167.51:443".to_string(), None, 0, 0),
958 }
959 };
960 let socks5 = self.inner.socks5.clone();
961 let transport = self.inner.transport.clone();
962
963 let new_conn = if let Some(key) = saved_key {
964 log::info!("[layer] Reconnecting to DC{home_dc_id} with saved key …");
965 match Connection::connect_with_key(
966 &addr, key, first_salt, time_offset, socks5.as_ref(), &transport,
967 ).await {
968 Ok(c) => c,
969 Err(e2) => {
970 log::warn!("[layer] connect_with_key failed ({e2}), fresh DH …");
971 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
972 }
973 }
974 } else {
975 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
976 };
977
978 let (new_writer, new_read, new_fk) = new_conn.into_writer();
979 let new_ak = new_writer.enc.auth_key_bytes();
980 let new_sid = new_writer.enc.session_id();
981 *self.inner.writer.lock().await = new_writer;
982
983 if let Err(e2) = self.init_connection().await {
984 log::warn!("[layer] init_connection after reconnect failed: {e2}");
985 }
986
987 Ok((new_read, new_fk, new_ak, new_sid))
988 }
989
990 pub async fn send_message(&self, peer: &str, text: &str) -> Result<(), InvocationError> {
994 let p = self.resolve_peer(peer).await?;
995 self.send_message_to_peer(p, text).await
996 }
997
998 pub async fn send_message_to_peer(
1000 &self,
1001 peer: tl::enums::Peer,
1002 text: &str,
1003 ) -> Result<(), InvocationError> {
1004 self.send_message_to_peer_ex(peer, &InputMessage::text(text)).await
1005 }
1006
1007 pub async fn send_message_to_peer_ex(
1009 &self,
1010 peer: tl::enums::Peer,
1011 msg: &InputMessage,
1012 ) -> Result<(), InvocationError> {
1013 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1014 let req = tl::functions::messages::SendMessage {
1015 no_webpage: msg.no_webpage,
1016 silent: msg.silent,
1017 background: msg.background,
1018 clear_draft: msg.clear_draft,
1019 noforwards: false,
1020 update_stickersets_order: false,
1021 invert_media: false,
1022 allow_paid_floodskip: false,
1023 peer: input_peer,
1024 reply_to: msg.reply_header(),
1025 message: msg.text.clone(),
1026 random_id: random_i64(),
1027 reply_markup: msg.reply_markup.clone(),
1028 entities: msg.entities.clone(),
1029 schedule_date: msg.schedule_date,
1030 schedule_repeat_period: None,
1031 send_as: None,
1032 quick_reply_shortcut: None,
1033 effect: None,
1034 allow_paid_stars: None,
1035 suggested_post: None,
1036 };
1037 self.rpc_write(&req).await
1038 }
1039
1040 pub async fn send_to_self(&self, text: &str) -> Result<(), InvocationError> {
1042 let req = tl::functions::messages::SendMessage {
1043 no_webpage: false,
1044 silent: false,
1045 background: false,
1046 clear_draft: false,
1047 noforwards: false,
1048 update_stickersets_order: false,
1049 invert_media: false,
1050 allow_paid_floodskip: false,
1051 peer: tl::enums::InputPeer::PeerSelf,
1052 reply_to: None,
1053 message: text.to_string(),
1054 random_id: random_i64(),
1055 reply_markup: None,
1056 entities: None,
1057 schedule_date: None,
1058 schedule_repeat_period: None,
1059 send_as: None,
1060 quick_reply_shortcut: None,
1061 effect: None,
1062 allow_paid_stars: None,
1063 suggested_post: None,
1064 };
1065 self.rpc_write(&req).await
1066 }
1067
1068 pub async fn edit_message(
1070 &self,
1071 peer: tl::enums::Peer,
1072 message_id: i32,
1073 new_text: &str,
1074 ) -> Result<(), InvocationError> {
1075 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1076 let req = tl::functions::messages::EditMessage {
1077 no_webpage: false,
1078 invert_media: false,
1079 peer: input_peer,
1080 id: message_id,
1081 message: Some(new_text.to_string()),
1082 media: None,
1083 reply_markup: None,
1084 entities: None,
1085 schedule_date: None,
1086 schedule_repeat_period: None,
1087 quick_reply_shortcut_id: None,
1088 };
1089 self.rpc_write(&req).await
1090 }
1091
1092 pub async fn forward_messages(
1094 &self,
1095 destination: tl::enums::Peer,
1096 message_ids: &[i32],
1097 source: tl::enums::Peer,
1098 ) -> Result<(), InvocationError> {
1099 let cache = self.inner.peer_cache.lock().await;
1100 let to_peer = cache.peer_to_input(&destination);
1101 let from_peer = cache.peer_to_input(&source);
1102 drop(cache);
1103
1104 let req = tl::functions::messages::ForwardMessages {
1105 silent: false,
1106 background: false,
1107 with_my_score: false,
1108 drop_author: false,
1109 drop_media_captions: false,
1110 noforwards: false,
1111 from_peer: from_peer,
1112 id: message_ids.to_vec(),
1113 random_id: (0..message_ids.len()).map(|_| random_i64()).collect(),
1114 to_peer: to_peer,
1115 top_msg_id: None,
1116 reply_to: None,
1117 schedule_date: None,
1118 schedule_repeat_period: None,
1119 send_as: None,
1120 quick_reply_shortcut: None,
1121 effect: None,
1122 video_timestamp: None,
1123 allow_paid_stars: None,
1124 allow_paid_floodskip: false,
1125 suggested_post: None,
1126 };
1127 self.rpc_write(&req).await
1128 }
1129
1130 pub async fn delete_messages(&self, message_ids: Vec<i32>, revoke: bool) -> Result<(), InvocationError> {
1132 let req = tl::functions::messages::DeleteMessages { revoke, id: message_ids };
1133 self.rpc_write(&req).await
1134 }
1135
1136 pub async fn get_messages_by_id(
1138 &self,
1139 peer: tl::enums::Peer,
1140 ids: &[i32],
1141 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1142 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1143 let id_list: Vec<tl::enums::InputMessage> = ids.iter()
1144 .map(|&id| tl::enums::InputMessage::Id(tl::types::InputMessageId { id }))
1145 .collect();
1146 let req = tl::functions::channels::GetMessages {
1147 channel: match &input_peer {
1148 tl::enums::InputPeer::Channel(c) =>
1149 tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
1150 channel_id: c.channel_id, access_hash: c.access_hash
1151 }),
1152 _ => return self.get_messages_user(input_peer, id_list).await,
1153 },
1154 id: id_list,
1155 };
1156 let body = self.rpc_call_raw(&req).await?;
1157 let mut cur = Cursor::from_slice(&body);
1158 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1159 tl::enums::messages::Messages::Messages(m) => m.messages,
1160 tl::enums::messages::Messages::Slice(m) => m.messages,
1161 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1162 tl::enums::messages::Messages::NotModified(_) => vec![],
1163 };
1164 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1165 }
1166
1167 async fn get_messages_user(
1168 &self,
1169 _peer: tl::enums::InputPeer,
1170 ids: Vec<tl::enums::InputMessage>,
1171 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1172 let req = tl::functions::messages::GetMessages { id: ids };
1173 let body = self.rpc_call_raw(&req).await?;
1174 let mut cur = Cursor::from_slice(&body);
1175 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1176 tl::enums::messages::Messages::Messages(m) => m.messages,
1177 tl::enums::messages::Messages::Slice(m) => m.messages,
1178 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1179 tl::enums::messages::Messages::NotModified(_) => vec![],
1180 };
1181 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1182 }
1183
1184 pub async fn get_pinned_message(
1186 &self,
1187 peer: tl::enums::Peer,
1188 ) -> Result<Option<update::IncomingMessage>, InvocationError> {
1189 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1190 let req = tl::functions::messages::Search {
1191 peer: input_peer,
1192 q: String::new(),
1193 from_id: None,
1194 saved_peer_id: None,
1195 saved_reaction: None,
1196 top_msg_id: None,
1197 filter: tl::enums::MessagesFilter::InputMessagesFilterPinned,
1198 min_date: 0,
1199 max_date: 0,
1200 offset_id: 0,
1201 add_offset: 0,
1202 limit: 1,
1203 max_id: 0,
1204 min_id: 0,
1205 hash: 0,
1206 };
1207 let body = self.rpc_call_raw(&req).await?;
1208 let mut cur = Cursor::from_slice(&body);
1209 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1210 tl::enums::messages::Messages::Messages(m) => m.messages,
1211 tl::enums::messages::Messages::Slice(m) => m.messages,
1212 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1213 tl::enums::messages::Messages::NotModified(_) => vec![],
1214 };
1215 Ok(msgs.into_iter().next().map(update::IncomingMessage::from_raw))
1216 }
1217
1218 pub async fn pin_message(
1220 &self,
1221 peer: tl::enums::Peer,
1222 message_id: i32,
1223 silent: bool,
1224 unpin: bool,
1225 pm_oneside: bool,
1226 ) -> Result<(), InvocationError> {
1227 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1228 let req = tl::functions::messages::UpdatePinnedMessage {
1229 silent,
1230 unpin,
1231 pm_oneside,
1232 peer: input_peer,
1233 id: message_id,
1234 };
1235 self.rpc_write(&req).await
1236 }
1237
1238 pub async fn unpin_message(
1240 &self,
1241 peer: tl::enums::Peer,
1242 message_id: i32,
1243 ) -> Result<(), InvocationError> {
1244 self.pin_message(peer, message_id, true, true, false).await
1245 }
1246
1247 pub async fn unpin_all_messages(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1249 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1250 let req = tl::functions::messages::UnpinAllMessages {
1251 peer: input_peer,
1252 top_msg_id: None,
1253 saved_peer_id: None,
1254 };
1255 self.rpc_write(&req).await
1256 }
1257
1258 pub async fn search_messages(
1262 &self,
1263 peer: tl::enums::Peer,
1264 query: &str,
1265 limit: i32,
1266 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1267 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1268 let req = tl::functions::messages::Search {
1269 peer: input_peer,
1270 q: query.to_string(),
1271 from_id: None,
1272 saved_peer_id: None,
1273 saved_reaction: None,
1274 top_msg_id: None,
1275 filter: tl::enums::MessagesFilter::InputMessagesFilterEmpty,
1276 min_date: 0,
1277 max_date: 0,
1278 offset_id: 0,
1279 add_offset: 0,
1280 limit,
1281 max_id: 0,
1282 min_id: 0,
1283 hash: 0,
1284 };
1285 let body = self.rpc_call_raw(&req).await?;
1286 let mut cur = Cursor::from_slice(&body);
1287 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1288 tl::enums::messages::Messages::Messages(m) => m.messages,
1289 tl::enums::messages::Messages::Slice(m) => m.messages,
1290 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1291 tl::enums::messages::Messages::NotModified(_) => vec![],
1292 };
1293 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1294 }
1295
1296 pub async fn search_global(
1298 &self,
1299 query: &str,
1300 limit: i32,
1301 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1302 let req = tl::functions::messages::SearchGlobal {
1303 broadcasts_only: false,
1304 groups_only: false,
1305 users_only: false,
1306 folder_id: None,
1307 q: query.to_string(),
1308 filter: tl::enums::MessagesFilter::InputMessagesFilterEmpty,
1309 min_date: 0,
1310 max_date: 0,
1311 offset_rate: 0,
1312 offset_peer: tl::enums::InputPeer::Empty,
1313 offset_id: 0,
1314 limit,
1315 };
1316 let body = self.rpc_call_raw(&req).await?;
1317 let mut cur = Cursor::from_slice(&body);
1318 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1319 tl::enums::messages::Messages::Messages(m) => m.messages,
1320 tl::enums::messages::Messages::Slice(m) => m.messages,
1321 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1322 tl::enums::messages::Messages::NotModified(_) => vec![],
1323 };
1324 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1325 }
1326
1327 pub async fn get_scheduled_messages(
1344 &self,
1345 peer: tl::enums::Peer,
1346 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1347 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1348 let req = tl::functions::messages::GetScheduledHistory {
1349 peer: input_peer,
1350 hash: 0,
1351 };
1352 let body = self.rpc_call_raw(&req).await?;
1353 let mut cur = Cursor::from_slice(&body);
1354 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1355 tl::enums::messages::Messages::Messages(m) => m.messages,
1356 tl::enums::messages::Messages::Slice(m) => m.messages,
1357 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1358 tl::enums::messages::Messages::NotModified(_) => vec![],
1359 };
1360 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1361 }
1362
1363 pub async fn delete_scheduled_messages(
1365 &self,
1366 peer: tl::enums::Peer,
1367 ids: Vec<i32>,
1368 ) -> Result<(), InvocationError> {
1369 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1370 let req = tl::functions::messages::DeleteScheduledMessages {
1371 peer: input_peer,
1372 id: ids,
1373 };
1374 self.rpc_write(&req).await
1375 }
1376
1377 pub async fn answer_callback_query(
1380 &self,
1381 query_id: i64,
1382 text: Option<&str>,
1383 alert: bool,
1384 ) -> Result<bool, InvocationError> {
1385 let req = tl::functions::messages::SetBotCallbackAnswer {
1386 alert,
1387 query_id,
1388 message: text.map(|s| s.to_string()),
1389 url: None,
1390 cache_time: 0,
1391 };
1392 let body = self.rpc_call_raw(&req).await?;
1393 Ok(!body.is_empty())
1394 }
1395
1396 pub async fn answer_inline_query(
1397 &self,
1398 query_id: i64,
1399 results: Vec<tl::enums::InputBotInlineResult>,
1400 cache_time: i32,
1401 is_personal: bool,
1402 next_offset: Option<String>,
1403 ) -> Result<bool, InvocationError> {
1404 let req = tl::functions::messages::SetInlineBotResults {
1405 gallery: false,
1406 private: is_personal,
1407 query_id,
1408 results,
1409 cache_time,
1410 next_offset,
1411 switch_pm: None,
1412 switch_webview: None,
1413 };
1414 let body = self.rpc_call_raw(&req).await?;
1415 Ok(!body.is_empty())
1416 }
1417
1418 pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
1422 let req = tl::functions::messages::GetDialogs {
1423 exclude_pinned: false,
1424 folder_id: None,
1425 offset_date: 0,
1426 offset_id: 0,
1427 offset_peer: tl::enums::InputPeer::Empty,
1428 limit,
1429 hash: 0,
1430 };
1431
1432 let body = self.rpc_call_raw(&req).await?;
1433 let mut cur = Cursor::from_slice(&body);
1434 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
1435 tl::enums::messages::Dialogs::Dialogs(d) => d,
1436 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
1437 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
1438 },
1439 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
1440 };
1441
1442 let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
1444 .filter_map(|m| {
1445 let id = match &m {
1446 tl::enums::Message::Message(x) => x.id,
1447 tl::enums::Message::Service(x) => x.id,
1448 tl::enums::Message::Empty(x) => x.id,
1449 };
1450 Some((id, m))
1451 })
1452 .collect();
1453
1454 let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
1456 .filter_map(|u| {
1457 if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
1458 })
1459 .collect();
1460
1461 let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
1463 .filter_map(|c| {
1464 let id = match &c {
1465 tl::enums::Chat::Chat(x) => x.id,
1466 tl::enums::Chat::Forbidden(x) => x.id,
1467 tl::enums::Chat::Channel(x) => x.id,
1468 tl::enums::Chat::ChannelForbidden(x) => x.id,
1469 tl::enums::Chat::Empty(x) => x.id,
1470 };
1471 Some((id, c))
1472 })
1473 .collect();
1474
1475 {
1477 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
1478 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
1479 self.cache_users_slice(&u_list).await;
1480 self.cache_chats_slice(&c_list).await;
1481 }
1482
1483 let result = raw.dialogs.into_iter().map(|d| {
1484 let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
1485 let peer = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
1486
1487 let message = msg_map.get(&top_id).cloned();
1488 let entity = peer.and_then(|p| match p {
1489 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
1490 _ => None,
1491 });
1492 let chat = peer.and_then(|p| match p {
1493 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
1494 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
1495 _ => None,
1496 });
1497
1498 Dialog { raw: d, message, entity, chat }
1499 }).collect();
1500
1501 Ok(result)
1502 }
1503
1504 async fn get_dialogs_raw(
1506 &self,
1507 req: tl::functions::messages::GetDialogs,
1508 ) -> Result<Vec<Dialog>, InvocationError> {
1509 let body = self.rpc_call_raw(&req).await?;
1510 let mut cur = Cursor::from_slice(&body);
1511 let raw = match tl::enums::messages::Dialogs::deserialize(&mut cur)? {
1512 tl::enums::messages::Dialogs::Dialogs(d) => d,
1513 tl::enums::messages::Dialogs::Slice(d) => tl::types::messages::Dialogs {
1514 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
1515 },
1516 tl::enums::messages::Dialogs::NotModified(_) => return Ok(vec![]),
1517 };
1518
1519 let msg_map: HashMap<i32, tl::enums::Message> = raw.messages.into_iter()
1520 .filter_map(|m| {
1521 let id = match &m {
1522 tl::enums::Message::Message(x) => x.id,
1523 tl::enums::Message::Service(x) => x.id,
1524 tl::enums::Message::Empty(x) => x.id,
1525 };
1526 Some((id, m))
1527 })
1528 .collect();
1529
1530 let user_map: HashMap<i64, tl::enums::User> = raw.users.into_iter()
1531 .filter_map(|u| {
1532 if let tl::enums::User::User(ref uu) = u { Some((uu.id, u)) } else { None }
1533 })
1534 .collect();
1535
1536 let chat_map: HashMap<i64, tl::enums::Chat> = raw.chats.into_iter()
1537 .filter_map(|c| {
1538 let id = match &c {
1539 tl::enums::Chat::Chat(x) => x.id,
1540 tl::enums::Chat::Forbidden(x) => x.id,
1541 tl::enums::Chat::Channel(x) => x.id,
1542 tl::enums::Chat::ChannelForbidden(x) => x.id,
1543 tl::enums::Chat::Empty(x) => x.id,
1544 };
1545 Some((id, c))
1546 })
1547 .collect();
1548
1549 {
1550 let u_list: Vec<tl::enums::User> = user_map.values().cloned().collect();
1551 let c_list: Vec<tl::enums::Chat> = chat_map.values().cloned().collect();
1552 self.cache_users_slice(&u_list).await;
1553 self.cache_chats_slice(&c_list).await;
1554 }
1555
1556 let result = raw.dialogs.into_iter().map(|d| {
1557 let top_id = match &d { tl::enums::Dialog::Dialog(x) => x.top_message, _ => 0 };
1558 let peer = match &d { tl::enums::Dialog::Dialog(x) => Some(&x.peer), _ => None };
1559
1560 let message = msg_map.get(&top_id).cloned();
1561 let entity = peer.and_then(|p| match p {
1562 tl::enums::Peer::User(u) => user_map.get(&u.user_id).cloned(),
1563 _ => None,
1564 });
1565 let chat = peer.and_then(|p| match p {
1566 tl::enums::Peer::Chat(c) => chat_map.get(&c.chat_id).cloned(),
1567 tl::enums::Peer::Channel(c) => chat_map.get(&c.channel_id).cloned(),
1568 _ => None,
1569 });
1570
1571 Dialog { raw: d, message, entity, chat }
1572 }).collect();
1573
1574 Ok(result)
1575 }
1576 pub async fn delete_dialog(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1577 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1578 let req = tl::functions::messages::DeleteHistory {
1579 just_clear: false,
1580 revoke: false,
1581 peer: input_peer,
1582 max_id: 0,
1583 min_date: None,
1584 max_date: None,
1585 };
1586 self.rpc_write(&req).await
1587 }
1588
1589 pub async fn mark_as_read(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1591 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1592 match &input_peer {
1593 tl::enums::InputPeer::Channel(c) => {
1594 let req = tl::functions::channels::ReadHistory {
1595 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
1596 channel_id: c.channel_id, access_hash: c.access_hash,
1597 }),
1598 max_id: 0,
1599 };
1600 self.rpc_call_raw(&req).await?;
1601 }
1602 _ => {
1603 let req = tl::functions::messages::ReadHistory { peer: input_peer, max_id: 0 };
1604 self.rpc_call_raw(&req).await?;
1605 }
1606 }
1607 Ok(())
1608 }
1609
1610 pub async fn clear_mentions(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1612 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1613 let req = tl::functions::messages::ReadMentions { peer: input_peer, top_msg_id: None };
1614 self.rpc_write(&req).await
1615 }
1616
1617 pub async fn send_chat_action(
1623 &self,
1624 peer: tl::enums::Peer,
1625 action: tl::enums::SendMessageAction,
1626 ) -> Result<(), InvocationError> {
1627 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1628 let req = tl::functions::messages::SetTyping {
1629 peer: input_peer,
1630 top_msg_id: None,
1631 action,
1632 };
1633 self.rpc_write(&req).await
1634 }
1635
1636 pub async fn join_chat(&self, peer: tl::enums::Peer) -> Result<(), InvocationError> {
1640 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
1641 match input_peer {
1642 tl::enums::InputPeer::Channel(c) => {
1643 let req = tl::functions::channels::JoinChannel {
1644 channel: tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
1645 channel_id: c.channel_id, access_hash: c.access_hash,
1646 }),
1647 };
1648 self.rpc_call_raw(&req).await?;
1649 }
1650 tl::enums::InputPeer::Chat(c) => {
1651 let req = tl::functions::messages::AddChatUser {
1652 chat_id: c.chat_id,
1653 user_id: tl::enums::InputUser::UserSelf,
1654 fwd_limit: 0,
1655 };
1656 self.rpc_call_raw(&req).await?;
1657 }
1658 _ => return Err(InvocationError::Deserialize("cannot join this peer type".into())),
1659 }
1660 Ok(())
1661 }
1662
1663 pub async fn accept_invite_link(&self, link: &str) -> Result<(), InvocationError> {
1665 let hash = Self::parse_invite_hash(link)
1666 .ok_or_else(|| InvocationError::Deserialize(format!("invalid invite link: {link}")))?;
1667 let req = tl::functions::messages::ImportChatInvite { hash: hash.to_string() };
1668 self.rpc_write(&req).await
1669 }
1670
1671 pub fn parse_invite_hash(link: &str) -> Option<&str> {
1673 if let Some(pos) = link.find("/+") {
1674 return Some(&link[pos + 2..]);
1675 }
1676 if let Some(pos) = link.find("/joinchat/") {
1677 return Some(&link[pos + 10..]);
1678 }
1679 None
1680 }
1681
1682 pub async fn get_messages(
1686 &self,
1687 peer: tl::enums::InputPeer,
1688 limit: i32,
1689 offset_id: i32,
1690 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
1691 let req = tl::functions::messages::GetHistory {
1692 peer, offset_id, offset_date: 0, add_offset: 0,
1693 limit, max_id: 0, min_id: 0, hash: 0,
1694 };
1695 let body = self.rpc_call_raw(&req).await?;
1696 let mut cur = Cursor::from_slice(&body);
1697 let msgs = match tl::enums::messages::Messages::deserialize(&mut cur)? {
1698 tl::enums::messages::Messages::Messages(m) => m.messages,
1699 tl::enums::messages::Messages::Slice(m) => m.messages,
1700 tl::enums::messages::Messages::ChannelMessages(m) => m.messages,
1701 tl::enums::messages::Messages::NotModified(_) => vec![],
1702 };
1703 Ok(msgs.into_iter().map(update::IncomingMessage::from_raw).collect())
1704 }
1705
1706 pub async fn resolve_peer(
1710 &self,
1711 peer: &str,
1712 ) -> Result<tl::enums::Peer, InvocationError> {
1713 match peer.trim() {
1714 "me" | "self" => Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: 0 })),
1715 username if username.starts_with('@') => {
1716 self.resolve_username(&username[1..]).await
1717 }
1718 id_str => {
1719 if let Ok(id) = id_str.parse::<i64>() {
1720 Ok(tl::enums::Peer::User(tl::types::PeerUser { user_id: id }))
1721 } else {
1722 Err(InvocationError::Deserialize(format!("cannot resolve peer: {peer}")))
1723 }
1724 }
1725 }
1726 }
1727
1728 async fn resolve_username(&self, username: &str) -> Result<tl::enums::Peer, InvocationError> {
1729 let req = tl::functions::contacts::ResolveUsername {
1730 username: username.to_string(), referer: None,
1731 };
1732 let body = self.rpc_call_raw(&req).await?;
1733 let mut cur = Cursor::from_slice(&body);
1734 let resolved = match tl::enums::contacts::ResolvedPeer::deserialize(&mut cur)? {
1735 tl::enums::contacts::ResolvedPeer::ResolvedPeer(r) => r,
1736 };
1737 self.cache_users_slice(&resolved.users).await;
1739 self.cache_chats_slice(&resolved.chats).await;
1740 Ok(resolved.peer)
1741 }
1742
1743 pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
1747 let body = self.rpc_call_raw(req).await?;
1748 let mut cur = Cursor::from_slice(&body);
1749 R::Return::deserialize(&mut cur).map_err(Into::into)
1750 }
1751
1752 async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
1753 let mut fail_count = NonZeroU32::new(1).unwrap();
1754 let mut slept_so_far = Duration::default();
1755 loop {
1756 match self.do_rpc_call(req).await {
1757 Ok(body) => return Ok(body),
1758 Err(e) => {
1759 let ctx = RetryContext { fail_count, slept_so_far, error: e };
1760 match self.inner.retry_policy.should_retry(&ctx) {
1761 ControlFlow::Continue(delay) => {
1762 sleep(delay).await;
1763 slept_so_far += delay;
1764 fail_count = fail_count.saturating_add(1);
1765 }
1766 ControlFlow::Break(()) => return Err(ctx.error),
1767 }
1768 }
1769 }
1770 }
1771 }
1772
1773 async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
1783 let (tx, rx) = oneshot::channel();
1784 {
1785 let mut w = self.inner.writer.lock().await;
1786 let (wire, msg_id) = w.enc.pack_with_msg_id(req);
1787 let fk = w.frame_kind.clone();
1788 self.inner.pending.lock().await.insert(msg_id, tx);
1791 send_frame_write(&mut w.write_half, &wire, &fk).await?;
1792 }
1793 match tokio::time::timeout(Duration::from_secs(30), rx).await {
1796 Ok(Ok(result)) => result,
1797 Ok(Err(_)) => Err(InvocationError::Deserialize("RPC channel closed (reader died?)".into())),
1798 Err(_) => Err(InvocationError::Deserialize("RPC timed out after 30 s".into())),
1799 }
1800 }
1801
1802 async fn rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
1805 let mut fail_count = NonZeroU32::new(1).unwrap();
1806 let mut slept_so_far = Duration::default();
1807 loop {
1808 let result = self.do_rpc_write(req).await;
1809 match result {
1810 Ok(()) => return Ok(()),
1811 Err(e) => {
1812 let ctx = RetryContext { fail_count, slept_so_far, error: e };
1813 match self.inner.retry_policy.should_retry(&ctx) {
1814 ControlFlow::Continue(delay) => {
1815 sleep(delay).await;
1816 slept_so_far += delay;
1817 fail_count = fail_count.saturating_add(1);
1818 }
1819 ControlFlow::Break(()) => return Err(ctx.error),
1820 }
1821 }
1822 }
1823 }
1824 }
1825
1826 async fn do_rpc_write<S: tl::Serializable>(&self, req: &S) -> Result<(), InvocationError> {
1827 let (tx, rx) = oneshot::channel();
1828 {
1829 let mut w = self.inner.writer.lock().await;
1830 let (wire, msg_id) = w.enc.pack_serializable_with_msg_id(req);
1831 let fk = w.frame_kind.clone();
1832 self.inner.pending.lock().await.insert(msg_id, tx);
1833 send_frame_write(&mut w.write_half, &wire, &fk).await?;
1834 }
1835 match tokio::time::timeout(Duration::from_secs(30), rx).await {
1836 Ok(Ok(result)) => result.map(|_| ()), Ok(Err(_)) => Err(InvocationError::Deserialize("rpc_write channel closed".into())),
1838 Err(_) => Err(InvocationError::Deserialize("rpc_write timed out after 30 s".into())),
1839 }
1840 }
1841
1842 async fn init_connection(&self) -> Result<(), InvocationError> {
1845 use tl::functions::{InvokeWithLayer, InitConnection, help::GetConfig};
1846 let req = InvokeWithLayer {
1847 layer: tl::LAYER,
1848 query: InitConnection {
1849 api_id: self.inner.api_id,
1850 device_model: "Linux".to_string(),
1851 system_version: "1.0".to_string(),
1852 app_version: env!("CARGO_PKG_VERSION").to_string(),
1853 system_lang_code: "en".to_string(),
1854 lang_pack: "".to_string(),
1855 lang_code: "en".to_string(),
1856 proxy: None,
1857 params: None,
1858 query: GetConfig {},
1859 },
1860 };
1861
1862 let body = self.rpc_call_raw_serializable(&req).await?;
1864
1865 let mut cur = Cursor::from_slice(&body);
1866 if let Ok(tl::enums::Config::Config(cfg)) = tl::enums::Config::deserialize(&mut cur) {
1867 let allow_ipv6 = self.inner.allow_ipv6;
1868 let mut opts = self.inner.dc_options.lock().await;
1869 for opt in &cfg.dc_options {
1870 let tl::enums::DcOption::DcOption(o) = opt;
1871 if o.media_only || o.cdn || o.tcpo_only { continue; }
1872 if o.ipv6 && !allow_ipv6 { continue; }
1873 let addr = format!("{}:{}", o.ip_address, o.port);
1874 let entry = opts.entry(o.id).or_insert_with(|| DcEntry {
1875 dc_id: o.id, addr: addr.clone(),
1876 auth_key: None, first_salt: 0, time_offset: 0,
1877 });
1878 entry.addr = addr;
1879 }
1880 log::info!("[layer] initConnection ✓ ({} DCs, ipv6={})", cfg.dc_options.len(), allow_ipv6);
1881 }
1882 Ok(())
1883 }
1884
1885 async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
1888 let addr = {
1889 let opts = self.inner.dc_options.lock().await;
1890 opts.get(&new_dc_id).map(|e| e.addr.clone())
1891 .unwrap_or_else(|| "149.154.167.51:443".to_string())
1892 };
1893 log::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
1894
1895 let saved_key = {
1896 let opts = self.inner.dc_options.lock().await;
1897 opts.get(&new_dc_id).and_then(|e| e.auth_key)
1898 };
1899
1900 let socks5 = self.inner.socks5.clone();
1901 let transport = self.inner.transport.clone();
1902 let conn = if let Some(key) = saved_key {
1903 Connection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
1904 } else {
1905 Connection::connect_raw(&addr, socks5.as_ref(), &transport).await?
1906 };
1907
1908 let new_key = conn.auth_key_bytes();
1909 {
1910 let mut opts = self.inner.dc_options.lock().await;
1911 let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
1912 dc_id: new_dc_id, addr: addr.clone(),
1913 auth_key: None, first_salt: 0, time_offset: 0,
1914 });
1915 entry.auth_key = Some(new_key);
1916 }
1917
1918 let (new_writer, new_read, new_fk) = conn.into_writer();
1920 let new_ak = new_writer.enc.auth_key_bytes();
1921 let new_sid = new_writer.enc.session_id();
1922 *self.inner.writer.lock().await = new_writer;
1923 let _ = self.inner.reconnect_tx.send((new_read, new_fk, new_ak, new_sid));
1924 *self.inner.home_dc_id.lock().await = new_dc_id;
1925 self.init_connection().await?;
1926 log::info!("[layer] Now on DC{new_dc_id} ✓");
1927 Ok(())
1928 }
1929
1930 async fn cache_user(&self, user: &tl::enums::User) {
1933 self.inner.peer_cache.lock().await.cache_user(user);
1934 }
1935
1936 async fn cache_users_slice(&self, users: &[tl::enums::User]) {
1937 let mut cache = self.inner.peer_cache.lock().await;
1938 cache.cache_users(users);
1939 }
1940
1941 async fn cache_chats_slice(&self, chats: &[tl::enums::Chat]) {
1942 let mut cache = self.inner.peer_cache.lock().await;
1943 cache.cache_chats(chats);
1944 }
1945
1946 #[doc(hidden)]
1948 pub async fn cache_users_slice_pub(&self, users: &[tl::enums::User]) {
1949 self.cache_users_slice(users).await;
1950 }
1951
1952 #[doc(hidden)]
1953 pub async fn cache_chats_slice_pub(&self, chats: &[tl::enums::Chat]) {
1954 self.cache_chats_slice(chats).await;
1955 }
1956
1957 #[doc(hidden)]
1959 pub async fn rpc_call_raw_pub<R: layer_tl_types::RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
1960 self.rpc_call_raw(req).await
1961 }
1962
1963 async fn rpc_call_raw_serializable<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
1965 let mut fail_count = NonZeroU32::new(1).unwrap();
1966 let mut slept_so_far = Duration::default();
1967 loop {
1968 match self.do_rpc_write_returning_body(req).await {
1969 Ok(body) => return Ok(body),
1970 Err(e) => {
1971 let ctx = RetryContext { fail_count, slept_so_far, error: e };
1972 match self.inner.retry_policy.should_retry(&ctx) {
1973 ControlFlow::Continue(delay) => {
1974 sleep(delay).await;
1975 slept_so_far += delay;
1976 fail_count = fail_count.saturating_add(1);
1977 }
1978 ControlFlow::Break(()) => return Err(ctx.error),
1979 }
1980 }
1981 }
1982 }
1983 }
1984
1985 async fn do_rpc_write_returning_body<S: tl::Serializable>(&self, req: &S) -> Result<Vec<u8>, InvocationError> {
1986 let (tx, rx) = oneshot::channel();
1987 {
1988 let mut w = self.inner.writer.lock().await;
1989 let (wire, msg_id) = w.enc.pack_serializable_with_msg_id(req);
1990 let fk = w.frame_kind.clone();
1991 self.inner.pending.lock().await.insert(msg_id, tx);
1992 send_frame_write(&mut w.write_half, &wire, &fk).await?;
1993 }
1994 match tokio::time::timeout(Duration::from_secs(30), rx).await {
1995 Ok(Ok(result)) => result,
1996 Ok(Err(_)) => Err(InvocationError::Deserialize("rpc channel closed".into())),
1997 Err(_) => Err(InvocationError::Deserialize("rpc timed out after 30 s".into())),
1998 }
1999 }
2000
2001 pub fn iter_dialogs(&self) -> DialogIter {
2018 DialogIter {
2019 offset_date: 0,
2020 offset_id: 0,
2021 offset_peer: tl::enums::InputPeer::Empty,
2022 done: false,
2023 buffer: VecDeque::new(),
2024 }
2025 }
2026
2027 pub fn iter_messages(&self, peer: tl::enums::Peer) -> MessageIter {
2041 MessageIter {
2042 peer,
2043 offset_id: 0,
2044 done: false,
2045 buffer: VecDeque::new(),
2046 }
2047 }
2048
2049 pub async fn resolve_to_input_peer(
2054 &self,
2055 peer: &tl::enums::Peer,
2056 ) -> Result<tl::enums::InputPeer, InvocationError> {
2057 let cache = self.inner.peer_cache.lock().await;
2058 match peer {
2059 tl::enums::Peer::User(u) => {
2060 if u.user_id == 0 {
2061 return Ok(tl::enums::InputPeer::PeerSelf);
2062 }
2063 match cache.users.get(&u.user_id) {
2064 Some(&hash) => Ok(tl::enums::InputPeer::User(tl::types::InputPeerUser {
2065 user_id: u.user_id, access_hash: hash,
2066 })),
2067 None => Err(InvocationError::Deserialize(format!(
2068 "access_hash unknown for user {}; resolve via username first", u.user_id
2069 ))),
2070 }
2071 }
2072 tl::enums::Peer::Chat(c) => {
2073 Ok(tl::enums::InputPeer::Chat(tl::types::InputPeerChat { chat_id: c.chat_id }))
2074 }
2075 tl::enums::Peer::Channel(c) => {
2076 match cache.channels.get(&c.channel_id) {
2077 Some(&hash) => Ok(tl::enums::InputPeer::Channel(tl::types::InputPeerChannel {
2078 channel_id: c.channel_id, access_hash: hash,
2079 })),
2080 None => Err(InvocationError::Deserialize(format!(
2081 "access_hash unknown for channel {}; resolve via username first", c.channel_id
2082 ))),
2083 }
2084 }
2085 }
2086 }
2087
2088 pub async fn invoke_on_dc<R: RemoteCall>(
2096 &self,
2097 dc_id: i32,
2098 req: &R,
2099 ) -> Result<R::Return, InvocationError> {
2100 let body = self.rpc_on_dc_raw(dc_id, req).await?;
2101 let mut cur = Cursor::from_slice(&body);
2102 R::Return::deserialize(&mut cur).map_err(Into::into)
2103 }
2104
2105 async fn rpc_on_dc_raw<R: RemoteCall>(
2107 &self,
2108 dc_id: i32,
2109 req: &R,
2110 ) -> Result<Vec<u8>, InvocationError> {
2111 let needs_new = {
2113 let pool = self.inner.dc_pool.lock().await;
2114 !pool.has_connection(dc_id)
2115 };
2116
2117 if needs_new {
2118 let addr = {
2119 let opts = self.inner.dc_options.lock().await;
2120 opts.get(&dc_id).map(|e| e.addr.clone())
2121 .ok_or_else(|| InvocationError::Deserialize(format!("unknown DC{dc_id}")))?
2122 };
2123
2124 let socks5 = self.inner.socks5.clone();
2125 let transport = self.inner.transport.clone();
2126 let saved_key = {
2127 let opts = self.inner.dc_options.lock().await;
2128 opts.get(&dc_id).and_then(|e| e.auth_key)
2129 };
2130
2131 let dc_conn = if let Some(key) = saved_key {
2132 dc_pool::DcConnection::connect_with_key(&addr, key, 0, 0, socks5.as_ref(), &transport).await?
2133 } else {
2134 let conn = dc_pool::DcConnection::connect_raw(&addr, socks5.as_ref(), &transport).await?;
2135 let home_dc_id = *self.inner.home_dc_id.lock().await;
2137 if dc_id != home_dc_id {
2138 if let Err(e) = self.export_import_auth(dc_id, &conn).await {
2139 log::warn!("[layer] Auth export/import for DC{dc_id} failed: {e}");
2140 }
2141 }
2142 conn
2143 };
2144
2145 let key = dc_conn.auth_key_bytes();
2146 {
2147 let mut opts = self.inner.dc_options.lock().await;
2148 if let Some(e) = opts.get_mut(&dc_id) {
2149 e.auth_key = Some(key);
2150 }
2151 }
2152 self.inner.dc_pool.lock().await.insert(dc_id, dc_conn);
2153 }
2154
2155 let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
2156 self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, req).await
2157 }
2158
2159 async fn export_import_auth(
2161 &self,
2162 dc_id: i32,
2163 _dc_conn: &dc_pool::DcConnection, ) -> Result<(), InvocationError> {
2165 let export_req = tl::functions::auth::ExportAuthorization { dc_id };
2167 let body = self.rpc_call_raw(&export_req).await?;
2168 let mut cur = Cursor::from_slice(&body);
2169 let exported = match tl::enums::auth::ExportedAuthorization::deserialize(&mut cur)? {
2170 tl::enums::auth::ExportedAuthorization::ExportedAuthorization(e) => e,
2171 };
2172
2173 let import_req = tl::functions::auth::ImportAuthorization {
2175 id: exported.id,
2176 bytes: exported.bytes,
2177 };
2178 let dc_entries: Vec<DcEntry> = self.inner.dc_options.lock().await.values().cloned().collect();
2179 self.inner.dc_pool.lock().await.invoke_on_dc(dc_id, &dc_entries, &import_req).await?;
2180 log::info!("[layer] Auth exported+imported to DC{dc_id} ✓");
2181 Ok(())
2182 }
2183
2184 async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
2187 let body = self.rpc_call_raw(&tl::functions::account::GetPassword {}).await?;
2188 let mut cur = Cursor::from_slice(&body);
2189 let pw = match tl::enums::account::Password::deserialize(&mut cur)? {
2190 tl::enums::account::Password::Password(p) => p,
2191 };
2192 Ok(PasswordToken { password: pw })
2193 }
2194
2195 fn make_send_code_req(&self, phone: &str) -> tl::functions::auth::SendCode {
2196 tl::functions::auth::SendCode {
2197 phone_number: phone.to_string(),
2198 api_id: self.inner.api_id,
2199 api_hash: self.inner.api_hash.clone(),
2200 settings: tl::enums::CodeSettings::CodeSettings(
2201 tl::types::CodeSettings {
2202 allow_flashcall: false, current_number: false, allow_app_hash: false,
2203 allow_missed_call: false, allow_firebase: false, unknown_number: false,
2204 logout_tokens: None, token: None, app_sandbox: None,
2205 },
2206 ),
2207 }
2208 }
2209
2210 fn extract_user_name(user: &tl::enums::User) -> String {
2211 match user {
2212 tl::enums::User::User(u) => {
2213 format!("{} {}",
2214 u.first_name.as_deref().unwrap_or(""),
2215 u.last_name.as_deref().unwrap_or(""))
2216 .trim().to_string()
2217 }
2218 tl::enums::User::Empty(_) => "(unknown)".into(),
2219 }
2220 }
2221
2222 fn extract_password_params(
2223 algo: &tl::enums::PasswordKdfAlgo,
2224 ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
2225 match algo {
2226 tl::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
2227 Ok((&a.salt1, &a.salt2, &a.p, a.g))
2228 }
2229 _ => Err(InvocationError::Deserialize("unsupported password KDF algo".into())),
2230 }
2231 }
2232}
2233
2234pub struct DialogIter {
2238 offset_date: i32,
2239 offset_id: i32,
2240 offset_peer: tl::enums::InputPeer,
2241 done: bool,
2242 buffer: VecDeque<Dialog>,
2243}
2244
2245impl DialogIter {
2246 const PAGE_SIZE: i32 = 100;
2247
2248 pub async fn next(&mut self, client: &Client) -> Result<Option<Dialog>, InvocationError> {
2250 if let Some(d) = self.buffer.pop_front() { return Ok(Some(d)); }
2251 if self.done { return Ok(None); }
2252
2253 let req = tl::functions::messages::GetDialogs {
2254 exclude_pinned: false,
2255 folder_id: None,
2256 offset_date: self.offset_date,
2257 offset_id: self.offset_id,
2258 offset_peer: self.offset_peer.clone(),
2259 limit: Self::PAGE_SIZE,
2260 hash: 0,
2261 };
2262
2263 let dialogs = client.get_dialogs_raw(req).await?;
2264 if dialogs.is_empty() || dialogs.len() < Self::PAGE_SIZE as usize {
2265 self.done = true;
2266 }
2267
2268 if let Some(last) = dialogs.last() {
2270 self.offset_date = last.message.as_ref().map(|m| match m {
2271 tl::enums::Message::Message(x) => x.date,
2272 tl::enums::Message::Service(x) => x.date,
2273 _ => 0,
2274 }).unwrap_or(0);
2275 self.offset_id = last.top_message();
2276 if let Some(peer) = last.peer() {
2277 self.offset_peer = client.inner.peer_cache.lock().await.peer_to_input(peer);
2278 }
2279 }
2280
2281 self.buffer.extend(dialogs);
2282 Ok(self.buffer.pop_front())
2283 }
2284}
2285
2286pub struct MessageIter {
2288 peer: tl::enums::Peer,
2289 offset_id: i32,
2290 done: bool,
2291 buffer: VecDeque<update::IncomingMessage>,
2292}
2293
2294impl MessageIter {
2295 const PAGE_SIZE: i32 = 100;
2296
2297 pub async fn next(&mut self, client: &Client) -> Result<Option<update::IncomingMessage>, InvocationError> {
2299 if let Some(m) = self.buffer.pop_front() { return Ok(Some(m)); }
2300 if self.done { return Ok(None); }
2301
2302 let input_peer = client.inner.peer_cache.lock().await.peer_to_input(&self.peer);
2303 let page = client.get_messages(input_peer, Self::PAGE_SIZE, self.offset_id).await?;
2304
2305 if page.is_empty() || page.len() < Self::PAGE_SIZE as usize {
2306 self.done = true;
2307 }
2308 if let Some(last) = page.last() {
2309 self.offset_id = last.id();
2310 }
2311
2312 self.buffer.extend(page);
2313 Ok(self.buffer.pop_front())
2314 }
2315}
2316
2317#[doc(hidden)]
2321pub fn random_i64_pub() -> i64 { random_i64() }
2322
2323#[derive(Clone)]
2327enum FrameKind {
2328 Abridged,
2329 Intermediate,
2330 #[allow(dead_code)]
2331 Full { send_seqno: u32, recv_seqno: u32 },
2332}
2333
2334
2335struct ConnectionWriter {
2340 write_half: OwnedWriteHalf,
2341 enc: EncryptedSession,
2342 frame_kind: FrameKind,
2343}
2344
2345impl ConnectionWriter {
2346 fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
2347 fn first_salt(&self) -> i64 { self.enc.salt }
2348 fn time_offset(&self) -> i32 { self.enc.time_offset }
2349}
2350
2351struct Connection {
2352 stream: TcpStream,
2353 enc: EncryptedSession,
2354 frame_kind: FrameKind,
2355}
2356
2357impl Connection {
2358 async fn open_stream(
2360 addr: &str,
2361 socks5: Option<&crate::socks5::Socks5Config>,
2362 transport: &TransportKind,
2363 ) -> Result<(TcpStream, FrameKind), InvocationError> {
2364 let stream = match socks5 {
2365 Some(proxy) => proxy.connect(addr).await?,
2366 None => TcpStream::connect(addr).await?,
2367 };
2368 Self::apply_transport_init(stream, transport).await
2369 }
2370
2371 async fn apply_transport_init(
2373 mut stream: TcpStream,
2374 transport: &TransportKind,
2375 ) -> Result<(TcpStream, FrameKind), InvocationError> {
2376 match transport {
2377 TransportKind::Abridged => {
2378 stream.write_all(&[0xef]).await?;
2379 Ok((stream, FrameKind::Abridged))
2380 }
2381 TransportKind::Intermediate => {
2382 stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
2383 Ok((stream, FrameKind::Intermediate))
2384 }
2385 TransportKind::Full => {
2386 Ok((stream, FrameKind::Full { send_seqno: 0, recv_seqno: 0 }))
2388 }
2389 TransportKind::Obfuscated { secret } => {
2390 let mut nonce = [0u8; 64];
2399 getrandom::getrandom(&mut nonce).map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
2400 let (enc_key, enc_iv, _dec_key, _dec_iv) = crate::transport_obfuscated::derive_keys(&nonce, secret.as_ref());
2402 let mut enc_cipher = crate::transport_obfuscated::ObfCipher::new(enc_key, enc_iv);
2403 let mut handshake = nonce;
2405 handshake[56] = 0xef; handshake[57] = 0xef;
2406 handshake[58] = 0xef; handshake[59] = 0xef;
2407 enc_cipher.apply(&mut handshake[56..]);
2408 stream.write_all(&handshake).await?;
2409 Ok((stream, FrameKind::Abridged))
2410 }
2411 }
2412 }
2413
2414 async fn connect_raw(
2415 addr: &str,
2416 socks5: Option<&crate::socks5::Socks5Config>,
2417 transport: &TransportKind,
2418 ) -> Result<Self, InvocationError> {
2419 log::info!("[layer] Connecting to {addr} (DH) …");
2420
2421 let addr2 = addr.to_string();
2425 let socks5_c = socks5.cloned();
2426 let transport_c = transport.clone();
2427
2428 let fut = async move {
2429 let (mut stream, frame_kind) =
2430 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
2431
2432 let mut plain = Session::new();
2433
2434 let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2435 send_frame(&mut stream, &plain.pack(&req1).to_plaintext_bytes(), &frame_kind).await?;
2436 let res_pq: tl::enums::ResPq = recv_frame_plain(&mut stream, &frame_kind).await?;
2437
2438 let (req2, s2) = auth::step2(s1, res_pq).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2439 send_frame(&mut stream, &plain.pack(&req2).to_plaintext_bytes(), &frame_kind).await?;
2440 let dh: tl::enums::ServerDhParams = recv_frame_plain(&mut stream, &frame_kind).await?;
2441
2442 let (req3, s3) = auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2443 send_frame(&mut stream, &plain.pack(&req3).to_plaintext_bytes(), &frame_kind).await?;
2444 let ans: tl::enums::SetClientDhParamsAnswer = recv_frame_plain(&mut stream, &frame_kind).await?;
2445
2446 let done = auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
2447 log::info!("[layer] DH complete ✓");
2448
2449 Ok::<Self, InvocationError>(Self {
2450 stream,
2451 enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
2452 frame_kind,
2453 })
2454 };
2455
2456 tokio::time::timeout(Duration::from_secs(15), fut)
2457 .await
2458 .map_err(|_| InvocationError::Deserialize(
2459 format!("DH handshake with {addr} timed out after 15 s")
2460 ))?
2461 }
2462
2463 async fn connect_with_key(
2464 addr: &str,
2465 auth_key: [u8; 256],
2466 first_salt: i64,
2467 time_offset: i32,
2468 socks5: Option<&crate::socks5::Socks5Config>,
2469 transport: &TransportKind,
2470 ) -> Result<Self, InvocationError> {
2471 let addr2 = addr.to_string();
2472 let socks5_c = socks5.cloned();
2473 let transport_c = transport.clone();
2474
2475 let fut = async move {
2476 let (stream, frame_kind) =
2477 Self::open_stream(&addr2, socks5_c.as_ref(), &transport_c).await?;
2478 Ok::<Self, InvocationError>(Self {
2479 stream,
2480 enc: EncryptedSession::new(auth_key, first_salt, time_offset),
2481 frame_kind,
2482 })
2483 };
2484
2485 tokio::time::timeout(Duration::from_secs(15), fut)
2486 .await
2487 .map_err(|_| InvocationError::Deserialize(
2488 format!("connect_with_key to {addr} timed out after 15 s")
2489 ))?
2490 }
2491
2492 fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
2493
2494 fn into_writer(self) -> (ConnectionWriter, OwnedReadHalf, FrameKind) {
2496 let (read_half, write_half) = self.stream.into_split();
2497 let writer = ConnectionWriter {
2498 write_half,
2499 enc: self.enc,
2500 frame_kind: self.frame_kind.clone(),
2501 };
2502 (writer, read_half, self.frame_kind)
2503 }
2504}
2505
2506async fn send_frame(
2510 stream: &mut TcpStream,
2511 data: &[u8],
2512 kind: &FrameKind,
2513) -> Result<(), InvocationError> {
2514 match kind {
2515 FrameKind::Abridged => send_abridged(stream, data).await,
2516 FrameKind::Intermediate => {
2517 stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
2518 stream.write_all(data).await?;
2519 Ok(())
2520 }
2521 FrameKind::Full { .. } => {
2522 stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
2528 stream.write_all(data).await?;
2529 Ok(())
2530 }
2531 }
2532}
2533
2534enum FrameOutcome {
2538 Frame(Vec<u8>),
2539 Error(InvocationError),
2540 Keepalive, }
2542
2543async fn recv_frame_with_keepalive(
2546 rh: &mut OwnedReadHalf,
2547 fk: &FrameKind,
2548 client: &Client,
2549 _ak: &[u8; 256],
2550) -> FrameOutcome {
2551 match tokio::time::timeout(Duration::from_secs(25), recv_frame_read(rh, fk)).await {
2552 Ok(Ok(raw)) => FrameOutcome::Frame(raw),
2553 Ok(Err(e)) => FrameOutcome::Error(e),
2554 Err(_) => {
2555 let ping_req = tl::functions::Ping { ping_id: random_i64() };
2557 let mut w = client.inner.writer.lock().await;
2558 let wire = w.enc.pack(&ping_req);
2559 let fk = w.frame_kind.clone();
2560 let _ = send_frame_write(&mut w.write_half, &wire, &fk).await;
2561 FrameOutcome::Keepalive
2562 }
2563 }
2564}
2565
2566async fn send_frame_write(
2568 stream: &mut OwnedWriteHalf,
2569 data: &[u8],
2570 kind: &FrameKind,
2571) -> Result<(), InvocationError> {
2572 match kind {
2573 FrameKind::Abridged => {
2574 let words = data.len() / 4;
2575 if words < 0x7f {
2576 stream.write_all(&[words as u8]).await?;
2577 } else {
2578 let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
2579 stream.write_all(&b).await?;
2580 }
2581 stream.write_all(data).await?;
2582 Ok(())
2583 }
2584 FrameKind::Intermediate | FrameKind::Full { .. } => {
2585 stream.write_all(&(data.len() as u32).to_le_bytes()).await?;
2586 stream.write_all(data).await?;
2587 Ok(())
2588 }
2589 }
2590}
2591
2592async fn recv_frame_read(
2594 stream: &mut OwnedReadHalf,
2595 kind: &FrameKind,
2596) -> Result<Vec<u8>, InvocationError> {
2597 match kind {
2598 FrameKind::Abridged => {
2599 let mut h = [0u8; 1];
2600 stream.read_exact(&mut h).await?;
2601 let words = if h[0] < 0x7f {
2602 h[0] as usize
2603 } else {
2604 let mut b = [0u8; 3];
2605 stream.read_exact(&mut b).await?;
2606 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
2607 };
2608 let len = words * 4;
2609 let mut buf = vec![0u8; len];
2610 stream.read_exact(&mut buf).await?;
2611 Ok(buf)
2612 }
2613 FrameKind::Intermediate | FrameKind::Full { .. } => {
2614 let mut len_buf = [0u8; 4];
2615 stream.read_exact(&mut len_buf).await?;
2616 let len = u32::from_le_bytes(len_buf) as usize;
2617 let mut buf = vec![0u8; len];
2618 stream.read_exact(&mut buf).await?;
2619 Ok(buf)
2620 }
2621 }
2622}
2623
2624
2625async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
2627 let words = data.len() / 4;
2628 if words < 0x7f {
2629 stream.write_all(&[words as u8]).await?;
2630 } else {
2631 let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
2632 stream.write_all(&b).await?;
2633 }
2634 stream.write_all(data).await?;
2635 Ok(())
2636}
2637
2638async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
2639 let mut h = [0u8; 1];
2640 stream.read_exact(&mut h).await?;
2641 let words = if h[0] < 0x7f {
2642 h[0] as usize
2643 } else {
2644 let mut b = [0u8; 3];
2645 stream.read_exact(&mut b).await?;
2646 let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
2647 if w == 1 {
2649 let mut code_buf = [0u8; 4];
2650 stream.read_exact(&mut code_buf).await?;
2651 let code = i32::from_le_bytes(code_buf);
2652 return Err(InvocationError::Rpc(RpcError::from_telegram(code, "transport error")));
2653 }
2654 w
2655 };
2656 if words == 0 || words > 0x8000 {
2659 return Err(InvocationError::Deserialize(
2660 format!("abridged: implausible word count {words} (possible transport error or framing mismatch)")
2661 ));
2662 }
2663 let mut buf = vec![0u8; words * 4];
2664 stream.read_exact(&mut buf).await?;
2665 Ok(buf)
2666}
2667
2668async fn recv_frame_plain<T: Deserializable>(
2670 stream: &mut TcpStream,
2671 _kind: &FrameKind,
2672) -> Result<T, InvocationError> {
2673 let raw = recv_abridged(stream).await?; if raw.len() < 20 {
2675 return Err(InvocationError::Deserialize("plaintext frame too short".into()));
2676 }
2677 if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
2678 return Err(InvocationError::Deserialize("expected auth_key_id=0 in plaintext".into()));
2679 }
2680 let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
2681 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
2682 T::deserialize(&mut cur).map_err(Into::into)
2683}
2684
2685enum EnvelopeResult {
2688 Payload(Vec<u8>),
2689 Updates(Vec<update::Update>),
2690 None,
2691}
2692
2693fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
2694 if body.len() < 4 {
2695 return Err(InvocationError::Deserialize("body < 4 bytes".into()));
2696 }
2697 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
2698
2699 match cid {
2700 ID_RPC_RESULT => {
2701 if body.len() < 12 {
2702 return Err(InvocationError::Deserialize("rpc_result too short".into()));
2703 }
2704 unwrap_envelope(body[12..].to_vec())
2705 }
2706 ID_RPC_ERROR => {
2707 if body.len() < 8 {
2708 return Err(InvocationError::Deserialize("rpc_error too short".into()));
2709 }
2710 let code = i32::from_le_bytes(body[4..8].try_into().unwrap());
2711 let message = tl_read_string(&body[8..]).unwrap_or_default();
2712 Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
2713 }
2714 ID_MSG_CONTAINER => {
2715 if body.len() < 8 {
2716 return Err(InvocationError::Deserialize("container too short".into()));
2717 }
2718 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
2719 let mut pos = 8usize;
2720 let mut payload: Option<Vec<u8>> = None;
2721 let mut updates_buf: Vec<update::Update> = Vec::new();
2722
2723 for _ in 0..count {
2724 if pos + 16 > body.len() { break; }
2725 let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
2726 pos += 16;
2727 if pos + inner_len > body.len() { break; }
2728 let inner = body[pos..pos + inner_len].to_vec();
2729 pos += inner_len;
2730 match unwrap_envelope(inner)? {
2731 EnvelopeResult::Payload(p) => { payload = Some(p); }
2732 EnvelopeResult::Updates(us) => { updates_buf.extend(us); }
2733 EnvelopeResult::None => {}
2734 }
2735 }
2736 if let Some(p) = payload {
2737 Ok(EnvelopeResult::Payload(p))
2738 } else if !updates_buf.is_empty() {
2739 Ok(EnvelopeResult::Updates(updates_buf))
2740 } else {
2741 Ok(EnvelopeResult::None)
2742 }
2743 }
2744 ID_GZIP_PACKED => {
2745 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
2746 unwrap_envelope(gz_inflate(&bytes)?)
2747 }
2748 ID_PONG | ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY
2750 | 0xd33b5459 | 0x04deb57d | 0x8cc0d131 | 0x276d3ec6 | 0x809db6df | 0x7d861a08 | 0x0949d9dc | 0xae500895 | 0x9299359f | 0xe22045fc | 0x62d350c9 => {
2763 Ok(EnvelopeResult::None)
2764 }
2765 ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED
2766 | ID_UPDATE_SHORT_MSG | ID_UPDATE_SHORT_CHAT_MSG
2767 | ID_UPDATES_TOO_LONG => {
2768 Ok(EnvelopeResult::Updates(update::parse_updates(&body)))
2769 }
2770 _ => Ok(EnvelopeResult::Payload(body)),
2771 }
2772}
2773
2774fn random_i64() -> i64 {
2777 let mut b = [0u8; 8];
2778 getrandom::getrandom(&mut b).expect("getrandom");
2779 i64::from_le_bytes(b)
2780}
2781
2782fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
2783 if data.is_empty() { return Some(vec![]); }
2784 let (len, start) = if data[0] < 254 { (data[0] as usize, 1) }
2785 else if data.len() >= 4 {
2786 (data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16, 4)
2787 } else { return None; };
2788 if data.len() < start + len { return None; }
2789 Some(data[start..start + len].to_vec())
2790}
2791
2792fn tl_read_string(data: &[u8]) -> Option<String> {
2793 tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
2794}
2795
2796fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
2797 use std::io::Read;
2798 let mut out = Vec::new();
2799 if flate2::read::GzDecoder::new(data).read_to_end(&mut out).is_ok() && !out.is_empty() {
2800 return Ok(out);
2801 }
2802 out.clear();
2803 flate2::read::ZlibDecoder::new(data)
2804 .read_to_end(&mut out)
2805 .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
2806 Ok(out)
2807}