grammers_client/client/
net.rs

1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8use super::{Client, ClientInner};
9use crate::client::client::ClientConfiguration;
10use grammers_mtsender::{InvocationError, RpcError, SenderPool};
11use grammers_tl_types::{self as tl, Deserializable};
12use log::info;
13use std::sync::Arc;
14use tokio::{sync::Mutex, time::sleep};
15
16/// Method implementations directly related with network connectivity.
17impl Client {
18    /// Creates and returns a new client instance upon successful connection to Telegram.
19    ///
20    /// If the session in the configuration did not have an authorization key, a new one
21    /// will be created and the session will be saved with it.
22    ///
23    /// The connection will be initialized with the data from the input configuration.
24    ///
25    /// The [`grammers_mtsender::SenderPoolHandle`] does not keep a reference to the [`grammers_session::Session`]
26    /// or `api_id`, but the [`SenderPool`] itself does, so the latter is used as input to guarantee
27    /// that the values are correctly shared between the pool and the client handles.
28    ///
29    /// # Examples
30    ///
31    /// ```
32    /// use std::sync::Arc;
33    /// use grammers_client::Client;
34    /// use grammers_session::storages::SqliteSession;
35    /// use grammers_mtsender::SenderPool;
36    ///
37    /// // Note: these are example values and are not actually valid.
38    /// //       Obtain your own with the developer's phone at https://my.telegram.org.
39    /// const API_ID: i32 = 932939;
40    ///
41    /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
42    /// let session = Arc::new(SqliteSession::open("hello-world.session")?);
43    /// let pool = SenderPool::new(Arc::clone(&session), API_ID);
44    /// let client = Client::new(&pool);
45    /// # Ok(())
46    /// # }
47    /// ```
48    pub fn new(sender_pool: &SenderPool) -> Self {
49        Self::with_configuration(sender_pool, Default::default())
50    }
51
52    /// Like [`Self::new`] but with a custom [`ClientConfiguration`].
53    pub fn with_configuration(
54        sender_pool: &SenderPool,
55        configuration: ClientConfiguration,
56    ) -> Self {
57        // TODO Sender doesn't have a way to handle backpressure yet
58        Self(Arc::new(ClientInner {
59            session: Arc::clone(&sender_pool.runner.session),
60            api_id: sender_pool.runner.api_id,
61            handle: sender_pool.handle.clone(),
62            configuration,
63            auth_copied_to_dcs: Mutex::new(Vec::new()),
64        }))
65    }
66
67    /// Invoke a raw API call. This directly sends the request to Telegram's servers.
68    ///
69    /// Using function definitions corresponding to a different layer is likely to cause the
70    /// responses to the request to not be understood.
71    ///
72    /// <div class="stab unstable">
73    ///
74    /// **Warning**: this method is **not** part of the stability guarantees of semantic
75    /// versioning. It **may** break during *minor* version changes (but not on patch version
76    /// changes). Use with care.
77    ///
78    /// </div>
79    ///
80    /// # Examples
81    ///
82    /// ```
83    /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
84    /// use grammers_tl_types as tl;
85    ///
86    /// dbg!(client.invoke(&tl::functions::Ping { ping_id: 0 }).await?);
87    /// # Ok(())
88    /// # }
89    /// ```
90    pub async fn invoke<R: tl::RemoteCall>(
91        &self,
92        request: &R,
93    ) -> Result<R::Return, InvocationError> {
94        let dc_id = self.0.session.home_dc_id();
95        self.do_invoke_in_dc(dc_id, request.to_bytes())
96            .await
97            .and_then(|body| R::Return::from_bytes(&body).map_err(|e| e.into()))
98    }
99
100    /// Like [`Self::invoke`], but in the specified DC.
101    pub async fn invoke_in_dc<R: tl::RemoteCall>(
102        &self,
103        dc_id: i32,
104        request: &R,
105    ) -> Result<R::Return, InvocationError> {
106        self.do_invoke_in_dc(dc_id, request.to_bytes())
107            .await
108            .and_then(|body| R::Return::from_bytes(&body).map_err(|e| e.into()))
109    }
110
111    async fn do_invoke_in_dc(
112        &self,
113        dc_id: i32,
114        request_body: Vec<u8>,
115    ) -> Result<Vec<u8>, InvocationError> {
116        let mut slept_flood = false;
117
118        loop {
119            match self
120                .0
121                .handle
122                .invoke_in_dc(dc_id, request_body.clone())
123                .await
124            {
125                Ok(response) => break Ok(response),
126                Err(InvocationError::Rpc(RpcError {
127                    name,
128                    code: 420,
129                    value: Some(seconds),
130                    ..
131                })) if !slept_flood && seconds <= self.0.configuration.flood_sleep_threshold => {
132                    let delay = std::time::Duration::from_secs(seconds as _);
133                    info!("sleeping on {} for {:?} before retrying", name, delay,);
134                    sleep(delay).await;
135                    slept_flood = true;
136                    continue;
137                }
138                Err(e) => break Err(e),
139            }
140        }
141    }
142
143    pub(crate) async fn copy_auth_to_dc(&self, target_dc_id: i32) -> Result<(), InvocationError> {
144        let mut auth_copied_to_dcs = self.0.auth_copied_to_dcs.lock().await;
145        if auth_copied_to_dcs.contains(&target_dc_id) {
146            return Ok(());
147        }
148
149        let home_dc_id = self.0.session.home_dc_id();
150        if target_dc_id == home_dc_id {
151            return Ok(());
152        }
153
154        let tl::enums::auth::ExportedAuthorization::Authorization(exported_auth) = self
155            .invoke(&tl::functions::auth::ExportAuthorization {
156                dc_id: target_dc_id,
157            })
158            .await?;
159
160        self.invoke_in_dc(
161            target_dc_id,
162            &tl::functions::auth::ImportAuthorization {
163                id: exported_auth.id,
164                bytes: exported_auth.bytes,
165            },
166        )
167        .await?;
168
169        auth_copied_to_dcs.push(target_dc_id);
170
171        Ok(())
172    }
173}