grammers_client/client/net.rs
1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9use std::num::NonZeroU32;
10use std::ops::ControlFlow;
11use std::sync::Arc;
12use std::time::Duration;
13
14use grammers_mtsender::{InvocationError, SenderPoolFatHandle};
15use grammers_tl_types::{self as tl, Deserializable};
16use log::info;
17use tokio::{sync::Mutex, time::sleep};
18
19use super::{Client, ClientConfiguration, ClientInner, RetryContext};
20
21/// Method implementations directly related with network connectivity.
22impl Client {
23 /// Creates and returns a new client instance upon successful connection to Telegram.
24 ///
25 /// If the session in the configuration did not have an authorization key, a new one
26 /// will be created and the session will be saved with it.
27 ///
28 /// The connection will be initialized with the data from the input configuration.
29 ///
30 /// The [`grammers_mtsender::SenderPoolHandle`] does not keep a reference to the [`grammers_session::Session`]
31 /// or `api_id`, but the [`SenderPool`] itself does, so the latter is used as input to guarantee
32 /// that the values are correctly shared between the pool and the client handles.
33 ///
34 /// # Examples
35 ///
36 /// ```
37 /// use std::sync::Arc;
38 /// use grammers_client::Client;
39 /// use grammers_session::storages::MemorySession; // avoid this storage outside tests!
40 /// use grammers_mtsender::SenderPool;
41 ///
42 /// // Note: these are example values and are not actually valid.
43 /// // Obtain your own with the developer's phone at https://my.telegram.org.
44 /// const API_ID: i32 = 932939;
45 ///
46 /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
47 /// let session = Arc::new(MemorySession::default());
48 /// let pool = SenderPool::new(Arc::clone(&session), API_ID);
49 /// let client = Client::new(pool.handle);
50 /// # Ok(())
51 /// # }
52 /// ```
53 pub fn new(sender_pool: SenderPoolFatHandle) -> Self {
54 Self::with_configuration(sender_pool, Default::default())
55 }
56
57 /// Like [`Self::new`] but with a custom [`ClientConfiguration`].
58 pub fn with_configuration(
59 sender_pool: SenderPoolFatHandle,
60 configuration: ClientConfiguration,
61 ) -> Self {
62 // TODO Sender doesn't have a way to handle backpressure yet
63 Self(Arc::new(ClientInner {
64 session: sender_pool.session,
65 api_id: sender_pool.api_id,
66 handle: sender_pool.thin,
67 configuration,
68 auth_copied_to_dcs: Mutex::new(Vec::new()),
69 }))
70 }
71
72 /// Invoke a raw API call. This directly sends the request to Telegram's servers.
73 ///
74 /// Using function definitions corresponding to a different layer is likely to cause the
75 /// responses to the request to not be understood.
76 ///
77 /// <div class="warning">
78 ///
79 /// This method is **not** part of the stability guarantees of semantic
80 /// versioning. It **may** break during *minor* version changes (but not on patch version
81 /// changes). Use with care.
82 ///
83 /// </div>
84 ///
85 /// # Examples
86 ///
87 /// ```
88 /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
89 /// use grammers_tl_types as tl;
90 ///
91 /// dbg!(client.invoke(&tl::functions::Ping { ping_id: 0 }).await?);
92 /// # Ok(())
93 /// # }
94 /// ```
95 pub async fn invoke<R: tl::RemoteCall>(
96 &self,
97 request: &R,
98 ) -> Result<R::Return, InvocationError> {
99 let dc_id = self.0.session.home_dc_id();
100 self.do_invoke_in_dc(dc_id, request.to_bytes())
101 .await
102 .and_then(|body| R::Return::from_bytes(&body).map_err(|e| e.into()))
103 }
104
105 /// Like [`Self::invoke`], but in the specified DC.
106 ///
107 /// <div class="warning">
108 ///
109 /// This method is **not** part of the stability guarantees of semantic
110 /// versioning. It **may** break during *minor* version changes (but not on patch version
111 /// changes). Use with care.
112 ///
113 /// </div>
114 pub async fn invoke_in_dc<R: tl::RemoteCall>(
115 &self,
116 dc_id: i32,
117 request: &R,
118 ) -> Result<R::Return, InvocationError> {
119 self.do_invoke_in_dc(dc_id, request.to_bytes())
120 .await
121 .and_then(|body| R::Return::from_bytes(&body).map_err(|e| e.into()))
122 }
123
124 async fn do_invoke_in_dc(
125 &self,
126 dc_id: i32,
127 request_body: Vec<u8>,
128 ) -> Result<Vec<u8>, InvocationError> {
129 let mut retry_context = RetryContext {
130 fail_count: NonZeroU32::new(1).unwrap(),
131 slept_so_far: Duration::default(),
132 error: InvocationError::Dropped,
133 };
134
135 loop {
136 match self
137 .0
138 .handle
139 .invoke_in_dc(dc_id, request_body.clone())
140 .await
141 {
142 Ok(response) => break Ok(response),
143 Err(e) => {
144 retry_context.error = e;
145 match self
146 .0
147 .configuration
148 .retry_policy
149 .should_retry(&retry_context)
150 {
151 ControlFlow::Continue(delay) => {
152 info!(
153 "sleeping on {} for {:?} before retrying",
154 retry_context.error, delay,
155 );
156 sleep(delay).await;
157 retry_context.fail_count = retry_context.fail_count.saturating_add(1);
158 retry_context.slept_so_far += delay;
159 continue;
160 }
161 ControlFlow::Break(()) => break Err(retry_context.error),
162 }
163 }
164 }
165 }
166 }
167
168 pub(crate) async fn copy_auth_to_dc(&self, target_dc_id: i32) -> Result<(), InvocationError> {
169 let mut auth_copied_to_dcs = self.0.auth_copied_to_dcs.lock().await;
170 if auth_copied_to_dcs.contains(&target_dc_id) {
171 return Ok(());
172 }
173
174 let home_dc_id = self.0.session.home_dc_id();
175 if target_dc_id == home_dc_id {
176 return Ok(());
177 }
178
179 let tl::enums::auth::ExportedAuthorization::Authorization(exported_auth) = self
180 .invoke(&tl::functions::auth::ExportAuthorization {
181 dc_id: target_dc_id,
182 })
183 .await?;
184
185 self.invoke_in_dc(
186 target_dc_id,
187 &tl::functions::auth::ImportAuthorization {
188 id: exported_auth.id,
189 bytes: exported_auth.bytes,
190 },
191 )
192 .await?;
193
194 auth_copied_to_dcs.push(target_dc_id);
195
196 Ok(())
197 }
198}