1use super::client::{ClientState, Connection};
9use super::{Client, ClientInner, Config};
10use crate::utils;
11use grammers_mtproto::mtp;
12use grammers_mtproto::transport;
13use grammers_mtsender::{self as sender, AuthorizationError, InvocationError, RpcError, Sender};
14use grammers_session::{ChatHashCache, MessageBox};
15use grammers_tl_types::{self as tl, Deserializable};
16use log::{debug, info};
17use sender::Enqueuer;
18use std::collections::{HashMap, VecDeque};
19use std::net::{Ipv4Addr, SocketAddr};
20use std::sync::atomic::{AtomicU32, Ordering};
21use std::sync::{Arc, RwLock};
22use tokio::sync::oneshot::error::TryRecvError;
23use tokio::sync::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
24
25const DC_ADDRESSES: [(Ipv4Addr, u16); 6] = [
31 (Ipv4Addr::new(0, 0, 0, 0), 0),
32 (Ipv4Addr::new(149, 154, 175, 53), 443),
33 (Ipv4Addr::new(149, 154, 167, 51), 443),
34 (Ipv4Addr::new(149, 154, 175, 100), 443),
35 (Ipv4Addr::new(149, 154, 167, 92), 443),
36 (Ipv4Addr::new(91, 108, 56, 190), 443),
37];
38
39const DEFAULT_DC: i32 = 2;
40
41pub(crate) async fn connect_sender(
42 dc_id: i32,
43 config: &Config,
44) -> Result<(Sender<transport::Full, mtp::Encrypted>, Enqueuer), AuthorizationError> {
45 let transport = transport::Full::new();
46
47 let addr: SocketAddr = if let Some(ip) = config.params.server_addr {
48 ip
49 } else {
50 DC_ADDRESSES[dc_id as usize].into()
51 };
52
53 let (mut sender, request_tx) = if let Some(auth_key) = config.session.dc_auth_key(dc_id) {
54 info!(
55 "creating a new sender with existing auth key to dc {} {:?}",
56 dc_id, addr
57 );
58
59 #[cfg(feature = "proxy")]
60 if let Some(url) = config.params.proxy_url.as_ref() {
61 sender::connect_via_proxy_with_auth(
62 transport,
63 addr,
64 auth_key,
65 url,
66 config.params.reconnection_policy,
67 )
68 .await?
69 } else {
70 sender::connect_with_auth(transport, addr, auth_key, config.params.reconnection_policy)
71 .await?
72 }
73
74 #[cfg(not(feature = "proxy"))]
75 sender::connect_with_auth(transport, addr, auth_key, config.params.reconnection_policy)
76 .await?
77 } else {
78 info!(
79 "creating a new sender and auth key in dc {} {:?}",
80 dc_id, addr
81 );
82
83 #[cfg(feature = "proxy")]
84 let (sender, tx) = if let Some(url) = config.params.proxy_url.as_ref() {
85 sender::connect_via_proxy(transport, addr, url, config.params.reconnection_policy)
86 .await?
87 } else {
88 sender::connect(transport, addr, config.params.reconnection_policy).await?
89 };
90
91 #[cfg(not(feature = "proxy"))]
92 let (sender, tx) =
93 sender::connect(transport, addr, config.params.reconnection_policy).await?;
94
95 config.session.insert_dc(dc_id, addr, sender.auth_key());
96 (sender, tx)
97 };
98
99 let _remote_config = sender
102 .invoke(&tl::functions::InvokeWithLayer {
103 layer: tl::LAYER,
104 query: tl::functions::InitConnection {
105 api_id: config.api_id,
106 device_model: config.params.device_model.clone(),
107 system_version: config.params.system_version.clone(),
108 app_version: config.params.app_version.clone(),
109 system_lang_code: config.params.system_lang_code.clone(),
110 lang_pack: "".into(),
111 lang_code: config.params.lang_code.clone(),
112 proxy: None,
113 params: None,
114 query: tl::functions::help::GetConfig {},
115 },
116 })
117 .await?;
118
119 Ok((sender, request_tx))
120}
121
122impl Client {
124 pub async fn connect(mut config: Config) -> Result<Self, AuthorizationError> {
153 let dc_id = config
154 .session
155 .get_user()
156 .map(|u| u.dc)
157 .unwrap_or(DEFAULT_DC);
158 let (sender, request_tx) = connect_sender(dc_id, &config).await?;
159 let message_box = if config.params.catch_up {
160 if let Some(state) = config.session.get_state() {
161 MessageBox::load(state)
162 } else {
163 MessageBox::new()
164 }
165 } else {
166 MessageBox::new()
169 };
170
171 let updates = if let Some(limit) = config.params.update_queue_limit {
173 VecDeque::with_capacity(limit)
174 } else {
175 VecDeque::new()
176 };
177
178 if let Some(0) = config.params.update_queue_limit {
180 config.params.update_queue_limit = None;
181 }
182
183 let self_user = config.session.get_user();
184
185 let should_get_state = message_box.is_empty() && config.session.signed_in();
187
188 let client = Self(Arc::new(ClientInner {
190 id: utils::generate_random_id(),
191 config,
192 conn: Connection::new(sender, request_tx),
193 state: RwLock::new(ClientState {
194 dc_id,
195 message_box,
196 chat_hashes: ChatHashCache::new(self_user.map(|u| (u.id, u.bot))),
197 last_update_limit_warn: None,
198 updates,
199 }),
200 downloader_map: AsyncRwLock::new(HashMap::new()),
201 }));
202
203 if should_get_state {
204 match client.invoke(&tl::functions::updates::GetState {}).await {
205 Ok(state) => {
206 {
207 client.0.state.write().unwrap().message_box.set_state(state);
208 }
209 client.sync_update_state();
210 }
211 Err(_err) => {
212 }
215 }
216 }
217
218 Ok(client)
219 }
220
221 pub async fn invoke<R: tl::RemoteCall>(
245 &self,
246 request: &R,
247 ) -> Result<R::Return, InvocationError> {
248 self.0
249 .conn
250 .invoke(
251 request,
252 self.0.config.params.flood_sleep_threshold,
253 |updates| self.process_socket_updates(updates),
254 )
255 .await
256 }
257
258 async fn export_authorization(
259 &self,
260 target_dc_id: i32,
261 ) -> Result<tl::types::auth::ExportedAuthorization, InvocationError> {
262 let request = tl::functions::auth::ExportAuthorization {
263 dc_id: target_dc_id,
264 };
265 match self.invoke(&request).await {
266 Ok(tl::enums::auth::ExportedAuthorization::Authorization(exported_auth)) => {
267 Ok(exported_auth)
268 }
269 Err(e) => Err(e),
270 }
271 }
272
273 async fn connect_sender(&self, dc_id: i32) -> Result<Arc<Connection>, InvocationError> {
274 let mut mutex = self.0.downloader_map.write().await;
275 debug!("Connecting new datacenter {}", dc_id);
276 match connect_sender(dc_id, &self.0.config).await {
277 Ok((new_sender, new_tx)) => {
278 let new_downloader = Arc::new(Connection::new(new_sender, new_tx));
279
280 let authorization = self.export_authorization(dc_id).await?;
282
283 let request = tl::functions::auth::ImportAuthorization {
285 id: authorization.id,
286 bytes: authorization.bytes,
287 };
288 new_downloader
289 .invoke(&request, self.0.config.params.flood_sleep_threshold, drop)
290 .await?;
291
292 mutex.insert(dc_id, new_downloader.clone());
293 Ok(new_downloader.clone())
294 }
295 Err(AuthorizationError::Invoke(e)) => Err(e),
296 Err(AuthorizationError::Gen(e)) => {
297 panic!("authorization key generation failed: {e}")
298 }
299 }
300 }
301
302 async fn get_downloader(&self, dc_id: i32) -> Result<Option<Arc<Connection>>, InvocationError> {
303 return Ok({
304 let guard = self.0.downloader_map.read().await;
305 guard.get(&dc_id).cloned()
306 });
307 }
308
309 pub async fn invoke_in_dc<R: tl::RemoteCall>(
310 &self,
311 request: &R,
312 dc_id: i32,
313 ) -> Result<R::Return, InvocationError> {
314 let downloader = match self.get_downloader(dc_id).await? {
315 None => self.connect_sender(dc_id).await?,
316 Some(fd) => fd,
317 };
318 downloader
319 .invoke(request, self.0.config.params.flood_sleep_threshold, drop)
320 .await
321 }
322
323 pub async fn step(&self) -> Result<(), sender::ReadError> {
340 let updates = self.0.conn.step().await?;
341 self.process_socket_updates(updates);
342 Ok(())
343 }
344
345 pub async fn run_until_disconnected(self) -> Result<(), sender::ReadError> {
358 loop {
359 self.step().await?;
361 }
362 }
363}
364
365impl Connection {
366 fn new(sender: Sender<transport::Full, mtp::Encrypted>, request_tx: Enqueuer) -> Self {
367 Self {
368 sender: AsyncMutex::new(sender),
369 request_tx: RwLock::new(request_tx),
370 step_counter: AtomicU32::new(0),
371 }
372 }
373
374 pub(crate) async fn invoke<R: tl::RemoteCall, F: Fn(Vec<tl::enums::Updates>)>(
375 &self,
376 request: &R,
377 flood_sleep_threshold: u32,
378 on_updates: F,
379 ) -> Result<R::Return, InvocationError> {
380 let mut slept_flood = false;
381
382 let mut rx = { self.request_tx.read().unwrap().enqueue(request) };
383 loop {
384 match rx.try_recv() {
385 Ok(response) => match response {
386 Ok(body) => break R::Return::from_bytes(&body).map_err(|e| e.into()),
387 Err(InvocationError::Rpc(RpcError {
388 name,
389 code: 420,
390 value: Some(seconds),
391 ..
392 })) if !slept_flood && seconds <= flood_sleep_threshold => {
393 let delay = std::time::Duration::from_secs(seconds as _);
394 info!(
395 "sleeping on {} for {:?} before retrying {}",
396 name,
397 delay,
398 std::any::type_name::<R>()
399 );
400 tokio::time::sleep(delay).await;
401 slept_flood = true;
402 rx = self.request_tx.read().unwrap().enqueue(request);
403 continue;
404 }
405 Err(e) => break Err(e),
406 },
407 Err(TryRecvError::Empty) => {
408 on_updates(self.step().await?);
409 }
410 Err(TryRecvError::Closed) => {
411 panic!("request channel dropped before receiving a result")
412 }
413 }
414 }
415 }
416
417 async fn step(&self) -> Result<Vec<tl::enums::Updates>, sender::ReadError> {
418 let ticket_number = self.step_counter.load(Ordering::SeqCst);
419 let mut sender = self.sender.lock().await;
420 match self.step_counter.compare_exchange(
421 ticket_number,
422 ticket_number.wrapping_add(1),
424 Ordering::SeqCst,
425 Ordering::SeqCst,
426 ) {
427 Ok(_) => sender.step().await, Err(_) => Ok(Vec::new()), }
430 }
431}