grammers_client/client/net.rs
1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8use super::{Client, ClientInner};
9use crate::client::client::ClientConfiguration;
10use grammers_mtsender::{InvocationError, RpcError, SenderPool};
11use grammers_tl_types::{self as tl, Deserializable};
12use log::info;
13use std::sync::Arc;
14use tokio::{sync::Mutex, time::sleep};
15
16/// Method implementations directly related with network connectivity.
17impl Client {
18 /// Creates and returns a new client instance upon successful connection to Telegram.
19 ///
20 /// If the session in the configuration did not have an authorization key, a new one
21 /// will be created and the session will be saved with it.
22 ///
23 /// The connection will be initialized with the data from the input configuration.
24 ///
25 /// The [`grammers_mtsender::SenderPoolHandle`] does not keep a reference to the [`grammers_session::Session`]
26 /// or `api_id`, but the [`SenderPool`] itself does, so the latter is used as input to guarantee
27 /// that the values are correctly shared between the pool and the client handles.
28 ///
29 /// # Examples
30 ///
31 /// ```
32 /// use std::sync::Arc;
33 /// use grammers_client::Client;
34 /// use grammers_session::storages::SqliteSession;
35 /// use grammers_mtsender::SenderPool;
36 ///
37 /// // Note: these are example values and are not actually valid.
38 /// // Obtain your own with the developer's phone at https://my.telegram.org.
39 /// const API_ID: i32 = 932939;
40 ///
41 /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
42 /// let session = Arc::new(SqliteSession::open("hello-world.session")?);
43 /// let pool = SenderPool::new(Arc::clone(&session), API_ID);
44 /// let client = Client::new(&pool);
45 /// # Ok(())
46 /// # }
47 /// ```
48 pub fn new(sender_pool: &SenderPool) -> Self {
49 Self::with_configuration(sender_pool, Default::default())
50 }
51
52 /// Like [`Self::new`] but with a custom [`ClientConfiguration`].
53 pub fn with_configuration(
54 sender_pool: &SenderPool,
55 configuration: ClientConfiguration,
56 ) -> Self {
57 // TODO Sender doesn't have a way to handle backpressure yet
58 Self(Arc::new(ClientInner {
59 session: Arc::clone(&sender_pool.runner.session),
60 api_id: sender_pool.runner.api_id,
61 handle: sender_pool.handle.clone(),
62 configuration,
63 auth_copied_to_dcs: Mutex::new(Vec::new()),
64 }))
65 }
66
67 /// Invoke a raw API call. This directly sends the request to Telegram's servers.
68 ///
69 /// Using function definitions corresponding to a different layer is likely to cause the
70 /// responses to the request to not be understood.
71 ///
72 /// <div class="stab unstable">
73 ///
74 /// **Warning**: this method is **not** part of the stability guarantees of semantic
75 /// versioning. It **may** break during *minor* version changes (but not on patch version
76 /// changes). Use with care.
77 ///
78 /// </div>
79 ///
80 /// # Examples
81 ///
82 /// ```
83 /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
84 /// use grammers_tl_types as tl;
85 ///
86 /// dbg!(client.invoke(&tl::functions::Ping { ping_id: 0 }).await?);
87 /// # Ok(())
88 /// # }
89 /// ```
90 pub async fn invoke<R: tl::RemoteCall>(
91 &self,
92 request: &R,
93 ) -> Result<R::Return, InvocationError> {
94 let dc_id = self.0.session.home_dc_id();
95 self.do_invoke_in_dc(dc_id, request.to_bytes())
96 .await
97 .and_then(|body| R::Return::from_bytes(&body).map_err(|e| e.into()))
98 }
99
100 /// Like [`Self::invoke`], but in the specified DC.
101 pub async fn invoke_in_dc<R: tl::RemoteCall>(
102 &self,
103 dc_id: i32,
104 request: &R,
105 ) -> Result<R::Return, InvocationError> {
106 self.do_invoke_in_dc(dc_id, request.to_bytes())
107 .await
108 .and_then(|body| R::Return::from_bytes(&body).map_err(|e| e.into()))
109 }
110
111 async fn do_invoke_in_dc(
112 &self,
113 dc_id: i32,
114 request_body: Vec<u8>,
115 ) -> Result<Vec<u8>, InvocationError> {
116 let mut slept_flood = false;
117
118 loop {
119 match self
120 .0
121 .handle
122 .invoke_in_dc(dc_id, request_body.clone())
123 .await
124 {
125 Ok(response) => break Ok(response),
126 Err(InvocationError::Rpc(RpcError {
127 name,
128 code: 420,
129 value: Some(seconds),
130 ..
131 })) if !slept_flood && seconds <= self.0.configuration.flood_sleep_threshold => {
132 let delay = std::time::Duration::from_secs(seconds as _);
133 info!("sleeping on {} for {:?} before retrying", name, delay,);
134 sleep(delay).await;
135 slept_flood = true;
136 continue;
137 }
138 Err(e) => break Err(e),
139 }
140 }
141 }
142
143 pub(crate) async fn copy_auth_to_dc(&self, target_dc_id: i32) -> Result<(), InvocationError> {
144 let mut auth_copied_to_dcs = self.0.auth_copied_to_dcs.lock().await;
145 if auth_copied_to_dcs.contains(&target_dc_id) {
146 return Ok(());
147 }
148
149 let home_dc_id = self.0.session.home_dc_id();
150 if target_dc_id == home_dc_id {
151 return Ok(());
152 }
153
154 let tl::enums::auth::ExportedAuthorization::Authorization(exported_auth) = self
155 .invoke(&tl::functions::auth::ExportAuthorization {
156 dc_id: target_dc_id,
157 })
158 .await?;
159
160 self.invoke_in_dc(
161 target_dc_id,
162 &tl::functions::auth::ImportAuthorization {
163 id: exported_auth.id,
164 bytes: exported_auth.bytes,
165 },
166 )
167 .await?;
168
169 auth_copied_to_dcs.push(target_dc_id);
170
171 Ok(())
172 }
173}