Skip to main content

grammers_client/client/
net.rs

1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9use std::num::NonZeroU32;
10use std::ops::ControlFlow;
11use std::sync::Arc;
12use std::time::Duration;
13
14use grammers_mtsender::{InvocationError, SenderPoolFatHandle};
15use grammers_tl_types::{self as tl, Deserializable};
16use log::info;
17use tokio::{sync::Mutex, time::sleep};
18
19use super::{Client, ClientConfiguration, ClientInner, RetryContext};
20
21/// Method implementations directly related with network connectivity.
22impl Client {
23    /// Creates and returns a new client instance upon successful connection to Telegram.
24    ///
25    /// If the session in the configuration did not have an authorization key, a new one
26    /// will be created and the session will be saved with it.
27    ///
28    /// The connection will be initialized with the data from the input configuration.
29    ///
30    /// The [`grammers_mtsender::SenderPoolHandle`] does not keep a reference to the [`grammers_session::Session`]
31    /// or `api_id`, but the [`SenderPool`] itself does, so the latter is used as input to guarantee
32    /// that the values are correctly shared between the pool and the client handles.
33    ///
34    /// # Examples
35    ///
36    /// ```
37    /// use std::sync::Arc;
38    /// use grammers_client::Client;
39    /// use grammers_session::storages::MemorySession; // avoid this storage outside tests!
40    /// use grammers_mtsender::SenderPool;
41    ///
42    /// // Note: these are example values and are not actually valid.
43    /// //       Obtain your own with the developer's phone at https://my.telegram.org.
44    /// const API_ID: i32 = 932939;
45    ///
46    /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
47    /// let session = Arc::new(MemorySession::default());
48    /// let pool = SenderPool::new(Arc::clone(&session), API_ID);
49    /// let client = Client::new(pool.handle);
50    /// # Ok(())
51    /// # }
52    /// ```
53    pub fn new(sender_pool: SenderPoolFatHandle) -> Self {
54        Self::with_configuration(sender_pool, Default::default())
55    }
56
57    /// Like [`Self::new`] but with a custom [`ClientConfiguration`].
58    pub fn with_configuration(
59        sender_pool: SenderPoolFatHandle,
60        configuration: ClientConfiguration,
61    ) -> Self {
62        // TODO Sender doesn't have a way to handle backpressure yet
63        Self(Arc::new(ClientInner {
64            session: sender_pool.session,
65            api_id: sender_pool.api_id,
66            handle: sender_pool.thin,
67            configuration,
68            auth_copied_to_dcs: Mutex::new(Vec::new()),
69        }))
70    }
71
72    /// Invoke a raw API call. This directly sends the request to Telegram's servers.
73    ///
74    /// Using function definitions corresponding to a different layer is likely to cause the
75    /// responses to the request to not be understood.
76    ///
77    /// <div class="warning">
78    ///
79    /// This method is **not** part of the stability guarantees of semantic
80    /// versioning. It **may** break during *minor* version changes (but not on patch version
81    /// changes). Use with care.
82    ///
83    /// </div>
84    ///
85    /// # Examples
86    ///
87    /// ```
88    /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
89    /// use grammers_tl_types as tl;
90    ///
91    /// dbg!(client.invoke(&tl::functions::Ping { ping_id: 0 }).await?);
92    /// # Ok(())
93    /// # }
94    /// ```
95    pub async fn invoke<R: tl::RemoteCall>(
96        &self,
97        request: &R,
98    ) -> Result<R::Return, InvocationError> {
99        let dc_id = self.0.session.home_dc_id();
100        self.do_invoke_in_dc(dc_id, request.to_bytes())
101            .await
102            .and_then(|body| R::Return::from_bytes(&body).map_err(|e| e.into()))
103    }
104
105    /// Like [`Self::invoke`], but in the specified DC.
106    ///
107    /// <div class="warning">
108    ///
109    /// This method is **not** part of the stability guarantees of semantic
110    /// versioning. It **may** break during *minor* version changes (but not on patch version
111    /// changes). Use with care.
112    ///
113    /// </div>
114    pub async fn invoke_in_dc<R: tl::RemoteCall>(
115        &self,
116        dc_id: i32,
117        request: &R,
118    ) -> Result<R::Return, InvocationError> {
119        self.do_invoke_in_dc(dc_id, request.to_bytes())
120            .await
121            .and_then(|body| R::Return::from_bytes(&body).map_err(|e| e.into()))
122    }
123
124    async fn do_invoke_in_dc(
125        &self,
126        dc_id: i32,
127        request_body: Vec<u8>,
128    ) -> Result<Vec<u8>, InvocationError> {
129        let mut retry_context = RetryContext {
130            fail_count: NonZeroU32::new(1).unwrap(),
131            slept_so_far: Duration::default(),
132            error: InvocationError::Dropped,
133        };
134
135        loop {
136            match self
137                .0
138                .handle
139                .invoke_in_dc(dc_id, request_body.clone())
140                .await
141            {
142                Ok(response) => break Ok(response),
143                Err(e) => {
144                    retry_context.error = e;
145                    match self
146                        .0
147                        .configuration
148                        .retry_policy
149                        .should_retry(&retry_context)
150                    {
151                        ControlFlow::Continue(delay) => {
152                            info!(
153                                "sleeping on {} for {:?} before retrying",
154                                retry_context.error, delay,
155                            );
156                            sleep(delay).await;
157                            retry_context.fail_count = retry_context.fail_count.saturating_add(1);
158                            retry_context.slept_so_far += delay;
159                            continue;
160                        }
161                        ControlFlow::Break(()) => break Err(retry_context.error),
162                    }
163                }
164            }
165        }
166    }
167
168    pub(crate) async fn copy_auth_to_dc(&self, target_dc_id: i32) -> Result<(), InvocationError> {
169        let mut auth_copied_to_dcs = self.0.auth_copied_to_dcs.lock().await;
170        if auth_copied_to_dcs.contains(&target_dc_id) {
171            return Ok(());
172        }
173
174        let home_dc_id = self.0.session.home_dc_id();
175        if target_dc_id == home_dc_id {
176            return Ok(());
177        }
178
179        let tl::enums::auth::ExportedAuthorization::Authorization(exported_auth) = self
180            .invoke(&tl::functions::auth::ExportAuthorization {
181                dc_id: target_dc_id,
182            })
183            .await?;
184
185        self.invoke_in_dc(
186            target_dc_id,
187            &tl::functions::auth::ImportAuthorization {
188                id: exported_auth.id,
189                bytes: exported_auth.bytes,
190            },
191        )
192        .await?;
193
194        auth_copied_to_dcs.push(target_dc_id);
195
196        Ok(())
197    }
198}