1#![deny(unsafe_code)]
89
90mod errors;
91mod retry;
92mod session;
93mod transport;
94mod two_factor_auth;
95pub mod update;
96
97pub use errors::{InvocationError, LoginToken, PasswordToken, RpcError, SignInError};
98pub use retry::{AutoSleep, NoRetries, RetryContext, RetryPolicy};
99pub use update::Update;
100
101use std::collections::HashMap;
102use std::num::NonZeroU32;
103use std::ops::ControlFlow;
104use std::path::PathBuf;
105use std::sync::Arc;
106use std::time::Duration;
107
108use layer_mtproto::{EncryptedSession, Session, authentication as auth};
109use layer_tl_types::{Cursor, Deserializable, RemoteCall};
110use session::{DcEntry, PersistedSession};
111use tokio::io::{AsyncReadExt, AsyncWriteExt};
112use tokio::net::TcpStream;
113use tokio::sync::{mpsc, Mutex};
114use tokio::time::sleep;
115
116const ID_RPC_RESULT: u32 = 0xf35c6d01;
119const ID_RPC_ERROR: u32 = 0x2144ca19;
120const ID_MSG_CONTAINER: u32 = 0x73f1f8dc;
121const ID_GZIP_PACKED: u32 = 0x3072cfa1;
122const ID_PONG: u32 = 0x347773c5;
123const ID_MSGS_ACK: u32 = 0x62d6b459;
124const ID_BAD_SERVER_SALT: u32 = 0xedab447b;
125const ID_NEW_SESSION: u32 = 0x9ec20908;
126const ID_BAD_MSG_NOTIFY: u32 = 0xa7eff811;
127const ID_UPDATES: u32 = 0x74ae4240;
128const ID_UPDATE_SHORT: u32 = 0x2114be86;
129const ID_UPDATES_COMBINED: u32 = 0x725b04c3;
130const ID_UPDATE_SHORT_MSG: u32 = 0x313bc7f8;
131
132#[derive(Clone)]
136pub struct Config {
137 pub session_path: PathBuf,
139 pub api_id: i32,
141 pub api_hash: String,
143 pub dc_addr: Option<String>,
145 pub retry_policy: Arc<dyn RetryPolicy>,
147}
148
149impl Default for Config {
150 fn default() -> Self {
151 Self {
152 session_path: "layer.session".into(),
153 api_id: 0,
154 api_hash: String::new(),
155 dc_addr: None,
156 retry_policy: Arc::new(AutoSleep::default()),
157 }
158 }
159}
160
161#[derive(Debug, Clone)]
165pub struct UpdatesConfiguration {
166 pub update_queue_limit: Option<usize>,
169}
170
171impl Default for UpdatesConfiguration {
172 fn default() -> Self {
173 Self { update_queue_limit: Some(500) }
174 }
175}
176
177pub struct UpdateStream {
183 rx: mpsc::UnboundedReceiver<update::Update>,
184}
185
186impl UpdateStream {
187 pub async fn next(&mut self) -> Option<update::Update> {
191 self.rx.recv().await
192 }
193}
194
195#[derive(Debug, Clone)]
199pub struct Dialog {
200 pub raw: layer_tl_types::enums::Dialog,
201 pub message: Option<layer_tl_types::enums::Message>,
203 pub entity: Option<layer_tl_types::enums::User>,
205}
206
207impl Dialog {
208 pub fn title(&self) -> String {
210 if let Some(layer_tl_types::enums::User::User(u)) = &self.entity {
211 let first = u.first_name.as_deref().unwrap_or("");
212 let last = u.last_name.as_deref().unwrap_or("");
213 return format!("{first} {last}").trim().to_string();
214 }
215 "(Unknown)".to_string()
216 }
217}
218
219struct ClientInner {
222 conn: Mutex<Connection>,
223 home_dc_id: Mutex<i32>,
224 dc_options: Mutex<HashMap<i32, DcEntry>>,
225 api_id: i32,
226 api_hash: String,
227 session_path: PathBuf,
228 retry_policy: Arc<dyn RetryPolicy>,
229 _update_tx: mpsc::UnboundedSender<update::Update>,
230}
231
232#[derive(Clone)]
234pub struct Client {
235 inner: Arc<ClientInner>,
236 _update_rx: Arc<Mutex<mpsc::UnboundedReceiver<update::Update>>>,
237}
238
239impl Client {
240 pub async fn connect(config: Config) -> Result<Self, InvocationError> {
247 let (update_tx, update_rx) = mpsc::unbounded_channel();
248
249 let (conn, home_dc_id, dc_opts) =
251 if config.session_path.exists() {
252 match PersistedSession::load(&config.session_path) {
253 Ok(s) => {
254 if let Some(dc) = s.dcs.iter().find(|d| d.dc_id == s.home_dc_id) {
255 if let Some(key) = dc.auth_key {
256 log::info!("[layer] Loading session (DC{}) …", s.home_dc_id);
257 match Connection::connect_with_key(&dc.addr, key, dc.first_salt, dc.time_offset).await {
258 Ok(c) => {
259 let mut opts = session::default_dc_addresses()
260 .into_iter()
261 .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
262 .collect::<HashMap<_, _>>();
263 for d in &s.dcs {
264 opts.insert(d.dc_id, d.clone());
265 }
266 (c, s.home_dc_id, opts)
267 }
268 Err(e) => {
269 log::warn!("[layer] Session connect failed ({e}), fresh connect …");
270 Self::fresh_connect().await?
271 }
272 }
273 } else {
274 Self::fresh_connect().await?
275 }
276 } else {
277 Self::fresh_connect().await?
278 }
279 }
280 Err(e) => {
281 log::warn!("[layer] Session load failed ({e}), fresh connect …");
282 Self::fresh_connect().await?
283 }
284 }
285 } else {
286 Self::fresh_connect().await?
287 };
288
289 let inner = Arc::new(ClientInner {
290 conn: Mutex::new(conn),
291 home_dc_id: Mutex::new(home_dc_id),
292 dc_options: Mutex::new(dc_opts),
293 api_id: config.api_id,
294 api_hash: config.api_hash,
295 session_path: config.session_path,
296 retry_policy: config.retry_policy,
297 _update_tx: update_tx,
298 });
299
300 let client = Self {
301 inner,
302 _update_rx: Arc::new(Mutex::new(update_rx)),
303 };
304
305 client.init_connection().await?;
307 Ok(client)
308 }
309
310 async fn fresh_connect() -> Result<(Connection, i32, HashMap<i32, DcEntry>), InvocationError> {
311 log::info!("[layer] Fresh connect to DC2 …");
312 let conn = Connection::connect_raw("149.154.167.51:443").await?;
313 let opts = session::default_dc_addresses()
314 .into_iter()
315 .map(|(id, addr)| (id, DcEntry { dc_id: id, addr, auth_key: None, first_salt: 0, time_offset: 0 }))
316 .collect();
317 Ok((conn, 2, opts))
318 }
319
320 pub async fn save_session(&self) -> Result<(), InvocationError> {
324 let conn_guard = self.inner.conn.lock().await;
325 let home_dc_id = *self.inner.home_dc_id.lock().await;
326 let dc_options = self.inner.dc_options.lock().await;
327
328 let dcs = dc_options.values().map(|e| {
329 DcEntry {
330 dc_id: e.dc_id,
331 addr: e.addr.clone(),
332 auth_key: if e.dc_id == home_dc_id { Some(conn_guard.auth_key_bytes()) } else { e.auth_key },
333 first_salt: if e.dc_id == home_dc_id { conn_guard.first_salt() } else { e.first_salt },
334 time_offset: if e.dc_id == home_dc_id { conn_guard.time_offset() } else { e.time_offset },
335 }
336 }).collect();
337
338 PersistedSession { home_dc_id, dcs }
339 .save(&self.inner.session_path)
340 .map_err(|e| InvocationError::Io(e))?;
341 log::info!("[layer] Session saved ✓");
342 Ok(())
343 }
344
345 pub async fn is_authorized(&self) -> Result<bool, InvocationError> {
349 match self.invoke(&layer_tl_types::functions::updates::GetState {}).await {
350 Ok(_) => Ok(true),
351 Err(e) if e.is("AUTH_KEY_UNREGISTERED") || matches!(&e, InvocationError::Rpc(r) if r.code == 401) => Ok(false),
352 Err(e) => Err(e),
353 }
354 }
355
356 pub async fn bot_sign_in(&self, token: &str) -> Result<String, InvocationError> {
363 let req = layer_tl_types::functions::auth::ImportBotAuthorization {
364 flags: 0,
365 api_id: self.inner.api_id,
366 api_hash: self.inner.api_hash.clone(),
367 bot_auth_token: token.to_string(),
368 };
369
370 let result = match self.invoke(&req).await {
371 Ok(x) => x,
372 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
373 let dc_id = r.value.unwrap_or(2) as i32;
374 self.migrate_to(dc_id).await?;
375 self.invoke(&req).await?
376 }
377 Err(e) => return Err(e),
378 };
379
380 let name = match result {
381 layer_tl_types::enums::auth::Authorization::Authorization(a) => {
382 Self::extract_user_name(&a.user)
383 }
384 layer_tl_types::enums::auth::Authorization::SignUpRequired(_) => {
385 panic!("unexpected SignUpRequired during bot sign-in")
386 }
387 };
388 log::info!("[layer] Bot signed in ✓ ({name})");
389 Ok(name)
390 }
391
392 pub async fn request_login_code(&self, phone: &str) -> Result<LoginToken, InvocationError> {
398 use layer_tl_types::enums::auth::SentCode;
399
400 let req = self.make_send_code_req(phone);
401 let body = match self.rpc_call_raw(&req).await {
402 Ok(b) => b,
403 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
404 let dc_id = r.value.unwrap_or(2) as i32;
405 self.migrate_to(dc_id).await?;
406 self.rpc_call_raw(&req).await?
407 }
408 Err(e) => return Err(e),
409 };
410
411 let mut cur = Cursor::from_slice(&body);
412 let hash = match layer_tl_types::enums::auth::SentCode::deserialize(&mut cur)? {
413 SentCode::SentCode(c) => c.phone_code_hash,
414 SentCode::Success(_) => return Err(InvocationError::Deserialize("unexpected SentCode::Success".into())),
415 SentCode::PaymentRequired(_) => return Err(InvocationError::Deserialize("payment required".into())),
416 };
417 log::info!("[layer] Login code sent");
418 Ok(LoginToken { phone: phone.to_string(), phone_code_hash: hash })
419 }
420
421 pub async fn sign_in(&self, token: &LoginToken, code: &str) -> Result<String, SignInError> {
428 let req = layer_tl_types::functions::auth::SignIn {
429 phone_number: token.phone.clone(),
430 phone_code_hash: token.phone_code_hash.clone(),
431 phone_code: Some(code.trim().to_string()),
432 email_verification: None,
433 };
434
435 let body = match self.rpc_call_raw(&req).await {
436 Ok(b) => b,
437 Err(InvocationError::Rpc(ref r)) if r.code == 303 => {
438 let dc_id = r.value.unwrap_or(2) as i32;
439 self.migrate_to(dc_id).await.map_err(SignInError::Other)?;
440 self.rpc_call_raw(&req).await.map_err(SignInError::Other)?
441 }
442 Err(e) if e.is("SESSION_PASSWORD_NEEDED") => {
443 let token = self.get_password_info().await.map_err(SignInError::Other)?;
444 return Err(SignInError::PasswordRequired(token));
445 }
446 Err(e) if e.is("PHONE_CODE_*") => return Err(SignInError::InvalidCode),
447 Err(e) => return Err(SignInError::Other(e)),
448 };
449
450 let mut cur = Cursor::from_slice(&body);
451 match layer_tl_types::enums::auth::Authorization::deserialize(&mut cur)
452 .map_err(|e| SignInError::Other(e.into()))?
453 {
454 layer_tl_types::enums::auth::Authorization::Authorization(a) => {
455 let name = Self::extract_user_name(&a.user);
456 log::info!("[layer] Signed in ✓ Welcome, {name}!");
457 Ok(name)
458 }
459 layer_tl_types::enums::auth::Authorization::SignUpRequired(_) => {
460 Err(SignInError::SignUpRequired)
461 }
462 }
463 }
464
465 pub async fn check_password(
469 &self,
470 token: PasswordToken,
471 password: impl AsRef<[u8]>,
472 ) -> Result<String, InvocationError> {
473 let pw = token.password;
474 let algo = pw.current_algo.ok_or_else(|| InvocationError::Deserialize("no current_algo".into()))?;
475
476 let (salt1, salt2, p, g) = Self::extract_password_params(&algo)?;
477 let g_b = pw.srp_b.ok_or_else(|| InvocationError::Deserialize("no srp_b".into()))?;
478 let a = pw.secure_random;
479 let srp_id = pw.srp_id.ok_or_else(|| InvocationError::Deserialize("no srp_id".into()))?;
480
481 let (m1, g_a) = two_factor_auth::calculate_2fa(salt1, salt2, p, g, &g_b, &a, password.as_ref());
482
483 let req = layer_tl_types::functions::auth::CheckPassword {
484 password: layer_tl_types::enums::InputCheckPasswordSrp::InputCheckPasswordSrp(
485 layer_tl_types::types::InputCheckPasswordSrp {
486 srp_id,
487 a: g_a.to_vec(),
488 m1: m1.to_vec(),
489 },
490 ),
491 };
492
493 let body = self.rpc_call_raw(&req).await?;
494 let mut cur = Cursor::from_slice(&body);
495 match layer_tl_types::enums::auth::Authorization::deserialize(&mut cur)? {
496 layer_tl_types::enums::auth::Authorization::Authorization(a) => {
497 let name = Self::extract_user_name(&a.user);
498 log::info!("[layer] 2FA ✓ Welcome, {name}!");
499 Ok(name)
500 }
501 layer_tl_types::enums::auth::Authorization::SignUpRequired(_) => {
502 Err(InvocationError::Deserialize("unexpected SignUpRequired after 2FA".into()))
503 }
504 }
505 }
506
507 pub async fn sign_out(&self) -> Result<bool, InvocationError> {
509 let req = layer_tl_types::functions::auth::LogOut {};
510 match self.rpc_call_raw(&req).await {
511 Ok(_body) => {
512 log::info!("[layer] Signed out ✓");
514 Ok(true)
515 }
516 Err(e) if e.is("AUTH_KEY_UNREGISTERED") => Ok(false),
517 Err(e) => Err(e),
518 }
519 }
520
521 pub fn stream_updates(&self) -> UpdateStream {
529 let (tx, rx) = mpsc::unbounded_channel();
530 let client = self.clone();
535 tokio::spawn(async move {
536 client.run_update_loop(tx).await;
537 });
538 UpdateStream { rx }
539 }
540
541 async fn run_update_loop(&self, tx: mpsc::UnboundedSender<update::Update>) {
544 loop {
547 let result = {
550 let mut conn = self.inner.conn.lock().await;
551 match tokio::time::timeout(
553 Duration::from_secs(30),
554 conn.recv_once()
555 ).await {
556 Ok(Ok(updates)) => Ok(updates),
557 Ok(Err(e)) => Err(e),
558 Err(_timeout) => {
559 let _ = conn.send_ping().await;
561 continue;
562 }
563 }
564 };
565
566 match result {
567 Ok(updates) => {
568 for u in updates {
569 let _ = tx.send(u);
570 }
571 }
572 Err(e) => {
573 log::warn!("[layer] Update loop error: {e} — reconnecting …");
574 sleep(Duration::from_secs(1)).await;
575 let home_dc_id = *self.inner.home_dc_id.lock().await;
577 let addr = {
578 let opts = self.inner.dc_options.lock().await;
579 opts.get(&home_dc_id).map(|e| e.addr.clone()).unwrap_or_else(|| "149.154.167.51:443".to_string())
580 };
581 match Connection::connect_raw(&addr).await {
582 Ok(new_conn) => {
583 *self.inner.conn.lock().await = new_conn;
584 let _ = self.init_connection().await;
585 }
586 Err(e2) => {
587 log::error!("[layer] Reconnect failed: {e2}");
588 break;
589 }
590 }
591 }
592 }
593 }
594 }
595
596 pub async fn send_message(&self, peer: &str, text: &str) -> Result<(), InvocationError> {
600 let input_peer = self.resolve_peer(peer).await?;
601 self.send_message_to_peer(input_peer, text).await
602 }
603
604 pub async fn send_message_to_peer(
606 &self,
607 peer: layer_tl_types::enums::Peer,
608 text: &str,
609 ) -> Result<(), InvocationError> {
610 let input_peer = peer_to_input_peer(peer);
611 let req = layer_tl_types::functions::messages::SendMessage {
612 no_webpage: false,
613 silent: false,
614 background: false,
615 clear_draft: false,
616 noforwards: false,
617 update_stickersets_order: false,
618 invert_media: false,
619 allow_paid_floodskip: false,
620 peer: input_peer,
621 reply_to: None,
622 message: text.to_string(),
623 random_id: random_i64(),
624 reply_markup: None,
625 entities: None,
626 schedule_date: None,
627 schedule_repeat_period: None,
628 send_as: None,
629 quick_reply_shortcut: None,
630 effect: None,
631 allow_paid_stars: None,
632 suggested_post: None,
633 };
634 self.rpc_call_raw(&req).await?;
635 Ok(())
636 }
637
638 pub async fn send_to_self(&self, text: &str) -> Result<(), InvocationError> {
640 let req = layer_tl_types::functions::messages::SendMessage {
641 no_webpage: false,
642 silent: false,
643 background: false,
644 clear_draft: false,
645 noforwards: false,
646 update_stickersets_order: false,
647 invert_media: false,
648 allow_paid_floodskip: false,
649 peer: layer_tl_types::enums::InputPeer::PeerSelf,
650 reply_to: None,
651 message: text.to_string(),
652 random_id: random_i64(),
653 reply_markup: None,
654 entities: None,
655 schedule_date: None,
656 schedule_repeat_period: None,
657 send_as: None,
658 quick_reply_shortcut: None,
659 effect: None,
660 allow_paid_stars: None,
661 suggested_post: None,
662 };
663 self.rpc_call_raw(&req).await?;
664 Ok(())
665 }
666
667 pub async fn delete_messages(
669 &self,
670 message_ids: Vec<i32>,
671 revoke: bool,
672 ) -> Result<(), InvocationError> {
673 let req = layer_tl_types::functions::messages::DeleteMessages {
674 revoke,
675 id: message_ids,
676 };
677 self.rpc_call_raw(&req).await?;
678 Ok(())
679 }
680
681 pub async fn answer_callback_query(
687 &self,
688 query_id: i64,
689 text: Option<&str>,
690 alert: bool,
691 ) -> Result<bool, InvocationError> {
692 let req = layer_tl_types::functions::messages::SetBotCallbackAnswer {
693 alert,
694 query_id,
695 message: text.map(|s| s.to_string()),
696 url: None,
697 cache_time: 0,
698 };
699 let body = self.rpc_call_raw(&req).await?;
700 Ok(!body.is_empty())
701 }
702
703 pub async fn answer_inline_query(
709 &self,
710 query_id: i64,
711 results: Vec<layer_tl_types::enums::InputBotInlineResult>,
712 cache_time: i32,
713 is_personal: bool,
714 next_offset: Option<String>,
715 ) -> Result<bool, InvocationError> {
716 let req = layer_tl_types::functions::messages::SetInlineBotResults {
717 gallery: false,
718 private: is_personal,
719 query_id,
720 results,
721 cache_time,
722 next_offset,
723 switch_pm: None,
724 switch_webview: None,
725 };
726 let body = self.rpc_call_raw(&req).await?;
727 Ok(!body.is_empty())
728 }
729
730 pub async fn get_dialogs(&self, limit: i32) -> Result<Vec<Dialog>, InvocationError> {
737 let req = layer_tl_types::functions::messages::GetDialogs {
738 exclude_pinned: false,
739 folder_id: None,
740 offset_date: 0,
741 offset_id: 0,
742 offset_peer: layer_tl_types::enums::InputPeer::Empty,
743 limit,
744 hash: 0,
745 };
746
747 let body = self.rpc_call_raw(&req).await?;
748 let mut cur = Cursor::from_slice(&body);
749 let dialogs = match layer_tl_types::enums::messages::Dialogs::deserialize(&mut cur)? {
750 layer_tl_types::enums::messages::Dialogs::Dialogs(d) => d,
751 layer_tl_types::enums::messages::Dialogs::Slice(d) => {
752 layer_tl_types::types::messages::Dialogs {
753 dialogs: d.dialogs, messages: d.messages, chats: d.chats, users: d.users,
754 }
755 }
756 layer_tl_types::enums::messages::Dialogs::NotModified(_) => {
757 return Ok(vec![]);
758 }
759 };
760
761 let result = dialogs.dialogs.into_iter().map(|d| Dialog {
762 raw: d,
763 message: None,
764 entity: None,
765 }).collect();
766 Ok(result)
767 }
768
769 pub async fn get_messages(
773 &self,
774 peer: layer_tl_types::enums::InputPeer,
775 limit: i32,
776 offset_id: i32,
777 ) -> Result<Vec<update::IncomingMessage>, InvocationError> {
778 let req = layer_tl_types::functions::messages::GetHistory {
779 peer,
780 offset_id,
781 offset_date: 0,
782 add_offset: 0,
783 limit,
784 max_id: 0,
785 min_id: 0,
786 hash: 0,
787 };
788
789 let body = self.rpc_call_raw(&req).await?;
790 let mut cur = Cursor::from_slice(&body);
791 let messages = match layer_tl_types::enums::messages::Messages::deserialize(&mut cur)? {
792 layer_tl_types::enums::messages::Messages::Messages(m) => m.messages,
793 layer_tl_types::enums::messages::Messages::Slice(m) => m.messages,
794 layer_tl_types::enums::messages::Messages::ChannelMessages(m) => m.messages,
795 layer_tl_types::enums::messages::Messages::NotModified(_) => vec![],
796 };
797
798 Ok(messages.into_iter().map(update::IncomingMessage::from_raw).collect())
799 }
800
801 pub async fn resolve_peer(
806 &self,
807 peer: &str,
808 ) -> Result<layer_tl_types::enums::Peer, InvocationError> {
809 match peer.trim() {
810 "me" | "self" => Ok(layer_tl_types::enums::Peer::User(
811 layer_tl_types::types::PeerUser { user_id: 0 }
812 )),
813 username if username.starts_with('@') => {
814 self.resolve_username(&username[1..]).await
815 }
816 id_str => {
817 if let Ok(id) = id_str.parse::<i64>() {
818 Ok(layer_tl_types::enums::Peer::User(
819 layer_tl_types::types::PeerUser { user_id: id }
820 ))
821 } else {
822 Err(InvocationError::Deserialize(format!("cannot resolve peer: {peer}")))
823 }
824 }
825 }
826 }
827
828 async fn resolve_username(&self, username: &str) -> Result<layer_tl_types::enums::Peer, InvocationError> {
829 let req = layer_tl_types::functions::contacts::ResolveUsername { username: username.to_string(), referer: None };
830 let body = self.rpc_call_raw(&req).await?;
831 let mut cur = Cursor::from_slice(&body);
832 let resolved = match layer_tl_types::enums::contacts::ResolvedPeer::deserialize(&mut cur)? {
833 layer_tl_types::enums::contacts::ResolvedPeer::ResolvedPeer(r) => r,
834 };
835 Ok(resolved.peer)
836 }
837
838 pub async fn invoke<R: RemoteCall>(&self, req: &R) -> Result<R::Return, InvocationError> {
845 let body = self.rpc_call_raw(req).await?;
846 let mut cur = Cursor::from_slice(&body);
847 R::Return::deserialize(&mut cur).map_err(Into::into)
848 }
849
850 async fn rpc_call_raw<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
852 let mut fail_count = NonZeroU32::new(1).unwrap();
853 let mut slept_so_far = Duration::default();
854
855 loop {
856 match self.do_rpc_call(req).await {
857 Ok(body) => return Ok(body),
858 Err(e) => {
859 let ctx = RetryContext { fail_count, slept_so_far, error: e };
860 match self.inner.retry_policy.should_retry(&ctx) {
861 ControlFlow::Continue(delay) => {
862 sleep(delay).await;
863 slept_so_far += delay;
864 fail_count = fail_count.saturating_add(1);
865 }
866 ControlFlow::Break(()) => return Err(ctx.error),
867 }
868 }
869 }
870 }
871 }
872
873 async fn do_rpc_call<R: RemoteCall>(&self, req: &R) -> Result<Vec<u8>, InvocationError> {
874 let mut conn = self.inner.conn.lock().await;
875 conn.rpc_call(req).await
876 }
877
878 async fn init_connection(&self) -> Result<(), InvocationError> {
881 use layer_tl_types::functions::{InvokeWithLayer, InitConnection, help::GetConfig};
882 let req = InvokeWithLayer {
883 layer: layer_tl_types::LAYER,
884 query: InitConnection {
885 api_id: self.inner.api_id,
886 device_model: "Linux".to_string(),
887 system_version: "1.0".to_string(),
888 app_version: "0.1.0".to_string(),
889 system_lang_code: "en".to_string(),
890 lang_pack: "".to_string(),
891 lang_code: "en".to_string(),
892 proxy: None,
893 params: None,
894 query: GetConfig {},
895 },
896 };
897
898 let body = {
899 let mut conn = self.inner.conn.lock().await;
900 conn.rpc_call_serializable(&req).await?
901 };
902
903 let mut cur = Cursor::from_slice(&body);
904 if let Ok(layer_tl_types::enums::Config::Config(cfg)) =
905 layer_tl_types::enums::Config::deserialize(&mut cur)
906 {
907 let mut opts = self.inner.dc_options.lock().await;
908 for opt in &cfg.dc_options {
909 let layer_tl_types::enums::DcOption::DcOption(o) = opt;
910 if o.media_only || o.cdn || o.tcpo_only || o.ipv6 { continue; }
911 let addr = format!("{}:{}", o.ip_address, o.port);
912 let entry = opts.entry(o.id).or_insert_with(|| DcEntry {
913 dc_id: o.id, addr: addr.clone(),
914 auth_key: None, first_salt: 0, time_offset: 0,
915 });
916 entry.addr = addr;
917 }
918 log::info!("[layer] initConnection ✓ ({} DCs known)", cfg.dc_options.len());
919 }
920 Ok(())
921 }
922
923 async fn migrate_to(&self, new_dc_id: i32) -> Result<(), InvocationError> {
926 let addr = {
927 let opts = self.inner.dc_options.lock().await;
928 opts.get(&new_dc_id).map(|e| e.addr.clone())
929 .unwrap_or_else(|| "149.154.167.51:443".to_string())
930 };
931
932 log::info!("[layer] Migrating to DC{new_dc_id} ({addr}) …");
933
934 let saved_key = {
935 let opts = self.inner.dc_options.lock().await;
936 opts.get(&new_dc_id).and_then(|e| e.auth_key)
937 };
938
939 let conn = if let Some(key) = saved_key {
940 Connection::connect_with_key(&addr, key, 0, 0).await?
941 } else {
942 Connection::connect_raw(&addr).await?
943 };
944
945 let new_key = conn.auth_key_bytes();
946 {
947 let mut opts = self.inner.dc_options.lock().await;
948 let entry = opts.entry(new_dc_id).or_insert_with(|| DcEntry {
949 dc_id: new_dc_id, addr: addr.clone(),
950 auth_key: None, first_salt: 0, time_offset: 0,
951 });
952 entry.auth_key = Some(new_key);
953 }
954
955 *self.inner.conn.lock().await = conn;
956 *self.inner.home_dc_id.lock().await = new_dc_id;
957 self.init_connection().await?;
958 log::info!("[layer] Now on DC{new_dc_id} ✓");
959 Ok(())
960 }
961
962 async fn get_password_info(&self) -> Result<PasswordToken, InvocationError> {
965 let body = self.rpc_call_raw(&layer_tl_types::functions::account::GetPassword {}).await?;
966 let mut cur = Cursor::from_slice(&body);
967 let pw = match layer_tl_types::enums::account::Password::deserialize(&mut cur)? {
968 layer_tl_types::enums::account::Password::Password(p) => p,
969 };
970 Ok(PasswordToken { password: pw })
971 }
972
973 fn make_send_code_req(&self, phone: &str) -> layer_tl_types::functions::auth::SendCode {
974 layer_tl_types::functions::auth::SendCode {
975 phone_number: phone.to_string(),
976 api_id: self.inner.api_id,
977 api_hash: self.inner.api_hash.clone(),
978 settings: layer_tl_types::enums::CodeSettings::CodeSettings(
979 layer_tl_types::types::CodeSettings {
980 allow_flashcall: false, current_number: false, allow_app_hash: false,
981 allow_missed_call: false, allow_firebase: false, unknown_number: false,
982 logout_tokens: None, token: None, app_sandbox: None,
983 },
984 ),
985 }
986 }
987
988 fn extract_user_name(user: &layer_tl_types::enums::User) -> String {
989 match user {
990 layer_tl_types::enums::User::User(u) => {
991 format!("{} {}", u.first_name.as_deref().unwrap_or(""),
992 u.last_name.as_deref().unwrap_or(""))
993 .trim().to_string()
994 }
995 layer_tl_types::enums::User::Empty(_) => "(unknown)".into(),
996 }
997 }
998
999 fn extract_password_params(
1000 algo: &layer_tl_types::enums::PasswordKdfAlgo,
1001 ) -> Result<(&[u8], &[u8], &[u8], i32), InvocationError> {
1002 match algo {
1003 layer_tl_types::enums::PasswordKdfAlgo::Sha256Sha256Pbkdf2Hmacsha512iter100000Sha256ModPow(a) => {
1004 Ok((&a.salt1, &a.salt2, &a.p, a.g))
1005 }
1006 _ => Err(InvocationError::Deserialize("unsupported password KDF algo".into())),
1007 }
1008 }
1009}
1010
1011struct Connection {
1015 stream: TcpStream,
1016 enc: EncryptedSession,
1017}
1018
1019impl Connection {
1020 async fn connect_raw(addr: &str) -> Result<Self, InvocationError> {
1021 log::info!("[layer] Connecting to {addr} (DH) …");
1022 let mut stream = TcpStream::connect(addr).await?;
1023
1024 stream.write_all(&[0xef]).await?;
1026
1027 let mut plain = Session::new();
1028
1029 let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1031 send_plain(&mut stream, &plain.pack(&req1).to_plaintext_bytes()).await?;
1032 let res_pq: layer_tl_types::enums::ResPq = recv_plain(&mut stream).await?;
1033
1034 let (req2, s2) = auth::step2(s1, res_pq).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1036 send_plain(&mut stream, &plain.pack(&req2).to_plaintext_bytes()).await?;
1037 let dh: layer_tl_types::enums::ServerDhParams = recv_plain(&mut stream).await?;
1038
1039 let (req3, s3) = auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1041 send_plain(&mut stream, &plain.pack(&req3).to_plaintext_bytes()).await?;
1042 let ans: layer_tl_types::enums::SetClientDhParamsAnswer = recv_plain(&mut stream).await?;
1043
1044 let done = auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1045 log::info!("[layer] DH complete ✓");
1046
1047 Ok(Self {
1048 stream,
1049 enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
1050 })
1051 }
1052
1053 async fn connect_with_key(
1054 addr: &str,
1055 auth_key: [u8; 256],
1056 first_salt: i64,
1057 time_offset: i32,
1058 ) -> Result<Self, InvocationError> {
1059 let mut stream = TcpStream::connect(addr).await?;
1060 stream.write_all(&[0xef]).await?;
1061 Ok(Self {
1062 stream,
1063 enc: EncryptedSession::new(auth_key, first_salt, time_offset),
1064 })
1065 }
1066
1067 fn auth_key_bytes(&self) -> [u8; 256] { self.enc.auth_key_bytes() }
1068 fn first_salt(&self) -> i64 { self.enc.salt }
1069 fn time_offset(&self) -> i32 { self.enc.time_offset }
1070
1071 async fn rpc_call<R: RemoteCall>(&mut self, req: &R) -> Result<Vec<u8>, InvocationError> {
1072 let wire = self.enc.pack(req);
1073 send_abridged(&mut self.stream, &wire).await?;
1074 self.recv_rpc().await
1075 }
1076
1077 async fn rpc_call_serializable<S: layer_tl_types::Serializable>(&mut self, req: &S) -> Result<Vec<u8>, InvocationError> {
1080 let wire = self.enc.pack_serializable(req);
1081 send_abridged(&mut self.stream, &wire).await?;
1082 self.recv_rpc().await
1083 }
1084
1085 async fn recv_rpc(&mut self) -> Result<Vec<u8>, InvocationError> {
1086 loop {
1087 let mut raw = recv_abridged(&mut self.stream).await?;
1088 let msg = self.enc.unpack(&mut raw)
1089 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1090 if msg.salt != 0 { self.enc.salt = msg.salt; }
1091 match unwrap_envelope(msg.body)? {
1092 EnvelopeResult::Payload(p) => return Ok(p),
1093 EnvelopeResult::Updates(updates) => {
1094 log::debug!("[layer] {} updates received during RPC call", updates.len());
1097 }
1098 EnvelopeResult::None => {}
1099 }
1100 }
1101 }
1102
1103 async fn recv_once(&mut self) -> Result<Vec<update::Update>, InvocationError> {
1105 let mut raw = recv_abridged(&mut self.stream).await?;
1106 let msg = self.enc.unpack(&mut raw)
1107 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1108 if msg.salt != 0 { self.enc.salt = msg.salt; }
1109 match unwrap_envelope(msg.body)? {
1110 EnvelopeResult::Updates(updates) => Ok(updates),
1111 _ => Ok(vec![]),
1112 }
1113 }
1114
1115 async fn send_ping(&mut self) -> Result<(), InvocationError> {
1116 let ping_id = random_i64();
1117 let req = layer_tl_types::functions::Ping { ping_id };
1118 let wire = self.enc.pack(&req);
1119 send_abridged(&mut self.stream, &wire).await?;
1120 Ok(())
1121 }
1122}
1123
1124async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
1127 let words = data.len() / 4;
1128 if words < 0x7f {
1129 stream.write_all(&[words as u8]).await?;
1130 } else {
1131 let b = [0x7f, (words & 0xff) as u8, ((words >> 8) & 0xff) as u8, ((words >> 16) & 0xff) as u8];
1132 stream.write_all(&b).await?;
1133 }
1134 stream.write_all(data).await?;
1135 Ok(())
1136}
1137
1138async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
1139 let mut h = [0u8; 1];
1140 stream.read_exact(&mut h).await?;
1141 let words = if h[0] < 0x7f {
1142 h[0] as usize
1143 } else {
1144 let mut b = [0u8; 3];
1145 stream.read_exact(&mut b).await?;
1146 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
1147 };
1148 let mut buf = vec![0u8; words * 4];
1149 stream.read_exact(&mut buf).await?;
1150 Ok(buf)
1151}
1152
1153async fn send_plain(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
1154 send_abridged(stream, data).await
1155}
1156
1157async fn recv_plain<T: Deserializable>(stream: &mut TcpStream) -> Result<T, InvocationError> {
1158 let raw = recv_abridged(stream).await?;
1159 if raw.len() < 20 { return Err(InvocationError::Deserialize("plaintext frame too short".into())); }
1160 if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
1161 return Err(InvocationError::Deserialize("expected auth_key_id=0 in plaintext".into()));
1162 }
1163 let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
1164 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
1165 T::deserialize(&mut cur).map_err(Into::into)
1166}
1167
1168enum EnvelopeResult {
1171 Payload(Vec<u8>),
1172 Updates(Vec<update::Update>),
1173 None,
1174}
1175
1176fn unwrap_envelope(body: Vec<u8>) -> Result<EnvelopeResult, InvocationError> {
1177 if body.len() < 4 { return Err(InvocationError::Deserialize("body < 4 bytes".into())); }
1178 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
1179
1180 match cid {
1181 ID_RPC_RESULT => {
1182 if body.len() < 12 { return Err(InvocationError::Deserialize("rpc_result too short".into())); }
1183 unwrap_envelope(body[12..].to_vec())
1184 }
1185 ID_RPC_ERROR => {
1186 if body.len() < 8 { return Err(InvocationError::Deserialize("rpc_error too short".into())); }
1187 let code = i32::from_le_bytes(body[4..8].try_into().unwrap());
1188 let message = tl_read_string(&body[8..]).unwrap_or_default();
1189 Err(InvocationError::Rpc(RpcError::from_telegram(code, &message)))
1190 }
1191 ID_MSG_CONTAINER => {
1192 if body.len() < 8 { return Err(InvocationError::Deserialize("container too short".into())); }
1193 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
1194 let mut pos = 8usize;
1195 let mut payload: Option<Vec<u8>> = None;
1196 let mut updates_buf: Vec<update::Update> = Vec::new();
1197
1198 for _ in 0..count {
1199 if pos + 16 > body.len() { break; }
1200 let inner_len = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
1201 pos += 16;
1202 if pos + inner_len > body.len() { break; }
1203 let inner = body[pos..pos + inner_len].to_vec();
1204 pos += inner_len;
1205 match unwrap_envelope(inner)? {
1206 EnvelopeResult::Payload(p) => { payload = Some(p); }
1207 EnvelopeResult::Updates(us) => { updates_buf.extend(us); }
1208 EnvelopeResult::None => {}
1209 }
1210 }
1211 if let Some(p) = payload {
1212 Ok(EnvelopeResult::Payload(p))
1213 } else if !updates_buf.is_empty() {
1214 Ok(EnvelopeResult::Updates(updates_buf))
1215 } else {
1216 Ok(EnvelopeResult::None)
1217 }
1218 }
1219 ID_GZIP_PACKED => {
1220 let bytes = tl_read_bytes(&body[4..]).unwrap_or_default();
1221 unwrap_envelope(gz_inflate(&bytes)?)
1222 }
1223 ID_PONG | ID_MSGS_ACK | ID_NEW_SESSION | ID_BAD_SERVER_SALT | ID_BAD_MSG_NOTIFY => {
1224 Ok(EnvelopeResult::None)
1225 }
1226 ID_UPDATES | ID_UPDATE_SHORT | ID_UPDATES_COMBINED | ID_UPDATE_SHORT_MSG => {
1228 parse_updates_envelope(cid, &body)
1229 }
1230 _ => Ok(EnvelopeResult::Payload(body)),
1231 }
1232}
1233
1234fn parse_updates_envelope(_cid: u32, body: &[u8]) -> Result<EnvelopeResult, InvocationError> {
1235 let updates = update::parse_updates(body);
1236 Ok(EnvelopeResult::Updates(updates))
1237}
1238
1239fn peer_to_input_peer(peer: layer_tl_types::enums::Peer) -> layer_tl_types::enums::InputPeer {
1242 match peer {
1243 layer_tl_types::enums::Peer::User(u) => {
1244 if u.user_id == 0 {
1245 layer_tl_types::enums::InputPeer::PeerSelf
1246 } else {
1247 layer_tl_types::enums::InputPeer::User(
1248 layer_tl_types::types::InputPeerUser { user_id: u.user_id, access_hash: 0 }
1249 )
1250 }
1251 }
1252 layer_tl_types::enums::Peer::Chat(c) => {
1253 layer_tl_types::enums::InputPeer::Chat(
1254 layer_tl_types::types::InputPeerChat { chat_id: c.chat_id }
1255 )
1256 }
1257 layer_tl_types::enums::Peer::Channel(c) => {
1258 layer_tl_types::enums::InputPeer::Channel(
1259 layer_tl_types::types::InputPeerChannel { channel_id: c.channel_id, access_hash: 0 }
1260 )
1261 }
1262 }
1263}
1264
1265fn random_i64() -> i64 {
1266 let mut b = [0u8; 8];
1267 getrandom::getrandom(&mut b).expect("getrandom");
1268 i64::from_le_bytes(b)
1269}
1270
1271fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
1272 if data.is_empty() { return Some(vec![]); }
1273 let (len, start) = if data[0] < 254 { (data[0] as usize, 1) }
1274 else if data.len() >= 4 { (data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16, 4) }
1275 else { return None; };
1276 if data.len() < start + len { return None; }
1277 Some(data[start..start + len].to_vec())
1278}
1279
1280fn tl_read_string(data: &[u8]) -> Option<String> {
1281 tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
1282}
1283
1284fn gz_inflate(data: &[u8]) -> Result<Vec<u8>, InvocationError> {
1285 use std::io::Read;
1286 let mut out = Vec::new();
1287 if flate2::read::GzDecoder::new(data).read_to_end(&mut out).is_ok() && !out.is_empty() {
1288 return Ok(out);
1289 }
1290 out.clear();
1291 flate2::read::ZlibDecoder::new(data)
1292 .read_to_end(&mut out)
1293 .map_err(|_| InvocationError::Deserialize("decompression failed".into()))?;
1294 Ok(out)
1295}