grammers_client/client/
net.rs

1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8use super::client::{ClientState, Connection};
9use super::{Client, ClientInner, Config};
10use crate::utils;
11use grammers_mtproto::mtp;
12use grammers_mtproto::transport;
13use grammers_mtsender::{self as sender, AuthorizationError, InvocationError, RpcError, Sender};
14use grammers_session::{ChatHashCache, MessageBox};
15use grammers_tl_types::{self as tl, Deserializable};
16use log::{debug, info};
17use sender::Enqueuer;
18use std::collections::{HashMap, VecDeque};
19use std::net::{Ipv4Addr, SocketAddr};
20use std::sync::atomic::{AtomicU32, Ordering};
21use std::sync::{Arc, RwLock};
22use tokio::sync::oneshot::error::TryRecvError;
23use tokio::sync::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
24
25/// Socket addresses to Telegram datacenters, where the index into this array
26/// represents the data center ID.
27///
28/// The addresses were obtained from the `static` addresses through a call to
29/// `functions::help::GetConfig`.
30const DC_ADDRESSES: [(Ipv4Addr, u16); 6] = [
31    (Ipv4Addr::new(0, 0, 0, 0), 0),
32    (Ipv4Addr::new(149, 154, 175, 53), 443),
33    (Ipv4Addr::new(149, 154, 167, 51), 443),
34    (Ipv4Addr::new(149, 154, 175, 100), 443),
35    (Ipv4Addr::new(149, 154, 167, 92), 443),
36    (Ipv4Addr::new(91, 108, 56, 190), 443),
37];
38
39const DEFAULT_DC: i32 = 2;
40
41pub(crate) async fn connect_sender(
42    dc_id: i32,
43    config: &Config,
44) -> Result<(Sender<transport::Full, mtp::Encrypted>, Enqueuer), AuthorizationError> {
45    let transport = transport::Full::new();
46
47    let addr: SocketAddr = if let Some(ip) = config.params.server_addr {
48        ip
49    } else {
50        DC_ADDRESSES[dc_id as usize].into()
51    };
52
53    let (mut sender, request_tx) = if let Some(auth_key) = config.session.dc_auth_key(dc_id) {
54        info!(
55            "creating a new sender with existing auth key to dc {} {:?}",
56            dc_id, addr
57        );
58
59        #[cfg(feature = "proxy")]
60        if let Some(url) = config.params.proxy_url.as_ref() {
61            sender::connect_via_proxy_with_auth(
62                transport,
63                addr,
64                auth_key,
65                url,
66                config.params.reconnection_policy,
67            )
68            .await?
69        } else {
70            sender::connect_with_auth(transport, addr, auth_key, config.params.reconnection_policy)
71                .await?
72        }
73
74        #[cfg(not(feature = "proxy"))]
75        sender::connect_with_auth(transport, addr, auth_key, config.params.reconnection_policy)
76            .await?
77    } else {
78        info!(
79            "creating a new sender and auth key in dc {} {:?}",
80            dc_id, addr
81        );
82
83        #[cfg(feature = "proxy")]
84        let (sender, tx) = if let Some(url) = config.params.proxy_url.as_ref() {
85            sender::connect_via_proxy(transport, addr, url, config.params.reconnection_policy)
86                .await?
87        } else {
88            sender::connect(transport, addr, config.params.reconnection_policy).await?
89        };
90
91        #[cfg(not(feature = "proxy"))]
92        let (sender, tx) =
93            sender::connect(transport, addr, config.params.reconnection_policy).await?;
94
95        config.session.insert_dc(dc_id, addr, sender.auth_key());
96        (sender, tx)
97    };
98
99    // TODO handle -404 (we had a previously-valid authkey, but server no longer knows about it)
100    // TODO all up-to-date server addresses should be stored in the session for future initial connections
101    let _remote_config = sender
102        .invoke(&tl::functions::InvokeWithLayer {
103            layer: tl::LAYER,
104            query: tl::functions::InitConnection {
105                api_id: config.api_id,
106                device_model: config.params.device_model.clone(),
107                system_version: config.params.system_version.clone(),
108                app_version: config.params.app_version.clone(),
109                system_lang_code: config.params.system_lang_code.clone(),
110                lang_pack: "".into(),
111                lang_code: config.params.lang_code.clone(),
112                proxy: None,
113                params: None,
114                query: tl::functions::help::GetConfig {},
115            },
116        })
117        .await?;
118
119    Ok((sender, request_tx))
120}
121
122/// Method implementations directly related with network connectivity.
123impl Client {
124    /// Creates and returns a new client instance upon successful connection to Telegram.
125    ///
126    /// If the session in the configuration did not have an authorization key, a new one
127    /// will be created and the session will be saved with it.
128    ///
129    /// The connection will be initialized with the data from the input configuration.
130    ///
131    /// # Examples
132    ///
133    /// ```
134    /// use grammers_client::{Client, Config};
135    /// use grammers_session::Session;
136    ///
137    /// // Note: these are example values and are not actually valid.
138    /// //       Obtain your own with the developer's phone at https://my.telegram.org.
139    /// const API_ID: i32 = 932939;
140    /// const API_HASH: &str = "514727c32270b9eb8cc16daf17e21e57";
141    ///
142    /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
143    /// let client = Client::connect(Config {
144    ///     session: Session::load_file_or_create("hello-world.session")?,
145    ///     api_id: API_ID,
146    ///     api_hash: API_HASH.to_string(),
147    ///     params: Default::default(),
148    /// }).await?;
149    /// # Ok(())
150    /// # }
151    /// ```
152    pub async fn connect(mut config: Config) -> Result<Self, AuthorizationError> {
153        let dc_id = config
154            .session
155            .get_user()
156            .map(|u| u.dc)
157            .unwrap_or(DEFAULT_DC);
158        let (sender, request_tx) = connect_sender(dc_id, &config).await?;
159        let message_box = if config.params.catch_up {
160            if let Some(state) = config.session.get_state() {
161                MessageBox::load(state)
162            } else {
163                MessageBox::new()
164            }
165        } else {
166            // If the user doesn't want to bother with catching up on previous update, start with
167            // pristine state instead.
168            MessageBox::new()
169        };
170
171        // Pre-allocate the right `VecDeque` size if a limit is given.
172        let updates = if let Some(limit) = config.params.update_queue_limit {
173            VecDeque::with_capacity(limit)
174        } else {
175            VecDeque::new()
176        };
177
178        // "Remove" the limit to avoid checking for it (and avoid warning).
179        if let Some(0) = config.params.update_queue_limit {
180            config.params.update_queue_limit = None;
181        }
182
183        let self_user = config.session.get_user();
184
185        // Don't bother getting pristine update state if we're not logged in.
186        let should_get_state = message_box.is_empty() && config.session.signed_in();
187
188        // TODO Sender doesn't have a way to handle backpressure yet
189        let client = Self(Arc::new(ClientInner {
190            id: utils::generate_random_id(),
191            config,
192            conn: Connection::new(sender, request_tx),
193            state: RwLock::new(ClientState {
194                dc_id,
195                message_box,
196                chat_hashes: ChatHashCache::new(self_user.map(|u| (u.id, u.bot))),
197                last_update_limit_warn: None,
198                updates,
199            }),
200            downloader_map: AsyncRwLock::new(HashMap::new()),
201        }));
202
203        if should_get_state {
204            match client.invoke(&tl::functions::updates::GetState {}).await {
205                Ok(state) => {
206                    {
207                        client.0.state.write().unwrap().message_box.set_state(state);
208                    }
209                    client.sync_update_state();
210                }
211                Err(_err) => {
212                    // The account may no longer actually be logged in, or it can rarely fail.
213                    // `message_box` will try to correct its state as updates arrive.
214                }
215            }
216        }
217
218        Ok(client)
219    }
220
221    /// Invoke a raw API call. This directly sends the request to Telegram's servers.
222    ///
223    /// Using function definitions corresponding to a different layer is likely to cause the
224    /// responses to the request to not be understood.
225    ///
226    /// <div class="stab unstable">
227    ///
228    /// **Warning**: this method is **not** part of the stability guarantees of semantic
229    /// versioning. It **may** break during *minor* version changes (but not on patch version
230    /// changes). Use with care.
231    ///
232    /// </div>
233    ///
234    /// # Examples
235    ///
236    /// ```
237    /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
238    /// use grammers_tl_types as tl;
239    ///
240    /// dbg!(client.invoke(&tl::functions::Ping { ping_id: 0 }).await?);
241    /// # Ok(())
242    /// # }
243    /// ```
244    pub async fn invoke<R: tl::RemoteCall>(
245        &self,
246        request: &R,
247    ) -> Result<R::Return, InvocationError> {
248        self.0
249            .conn
250            .invoke(
251                request,
252                self.0.config.params.flood_sleep_threshold,
253                |updates| self.process_socket_updates(updates),
254            )
255            .await
256    }
257
258    async fn export_authorization(
259        &self,
260        target_dc_id: i32,
261    ) -> Result<tl::types::auth::ExportedAuthorization, InvocationError> {
262        let request = tl::functions::auth::ExportAuthorization {
263            dc_id: target_dc_id,
264        };
265        match self.invoke(&request).await {
266            Ok(tl::enums::auth::ExportedAuthorization::Authorization(exported_auth)) => {
267                Ok(exported_auth)
268            }
269            Err(e) => Err(e),
270        }
271    }
272
273    async fn connect_sender(&self, dc_id: i32) -> Result<Arc<Connection>, InvocationError> {
274        let mut mutex = self.0.downloader_map.write().await;
275        debug!("Connecting new datacenter {}", dc_id);
276        match connect_sender(dc_id, &self.0.config).await {
277            Ok((new_sender, new_tx)) => {
278                let new_downloader = Arc::new(Connection::new(new_sender, new_tx));
279
280                // export auth
281                let authorization = self.export_authorization(dc_id).await?;
282
283                // import into new sender
284                let request = tl::functions::auth::ImportAuthorization {
285                    id: authorization.id,
286                    bytes: authorization.bytes,
287                };
288                new_downloader
289                    .invoke(&request, self.0.config.params.flood_sleep_threshold, drop)
290                    .await?;
291
292                mutex.insert(dc_id, new_downloader.clone());
293                Ok(new_downloader.clone())
294            }
295            Err(AuthorizationError::Invoke(e)) => Err(e),
296            Err(AuthorizationError::Gen(e)) => {
297                panic!("authorization key generation failed: {e}")
298            }
299        }
300    }
301
302    async fn get_downloader(&self, dc_id: i32) -> Result<Option<Arc<Connection>>, InvocationError> {
303        return Ok({
304            let guard = self.0.downloader_map.read().await;
305            guard.get(&dc_id).cloned()
306        });
307    }
308
309    pub async fn invoke_in_dc<R: tl::RemoteCall>(
310        &self,
311        request: &R,
312        dc_id: i32,
313    ) -> Result<R::Return, InvocationError> {
314        let downloader = match self.get_downloader(dc_id).await? {
315            None => self.connect_sender(dc_id).await?,
316            Some(fd) => fd,
317        };
318        downloader
319            .invoke(request, self.0.config.params.flood_sleep_threshold, drop)
320            .await
321    }
322
323    /// Perform a single network step.
324    ///
325    /// Most commonly, you will want to use the higher-level abstraction [`Client::next_update`]
326    /// instead.
327    ///
328    /// # Examples
329    ///
330    /// ```
331    /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
332    /// loop {
333    ///     // Process network events forever until we gracefully disconnect or get an error.
334    ///     client.step().await?;
335    /// }
336    /// # Ok(())
337    /// # }
338    /// ```
339    pub async fn step(&self) -> Result<(), sender::ReadError> {
340        let updates = self.0.conn.step().await?;
341        self.process_socket_updates(updates);
342        Ok(())
343    }
344
345    /// Run the client by repeatedly calling [`Client::step`] until a graceful disconnection
346    /// occurs, or a network error occurs. Incoming updates are ignored and simply dropped.
347    /// instead.
348    ///
349    /// # Examples
350    ///
351    /// ```
352    /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
353    /// client.run_until_disconnected().await?;
354    /// # Ok(())
355    /// # }
356    /// ```
357    pub async fn run_until_disconnected(self) -> Result<(), sender::ReadError> {
358        loop {
359            // TODO review doc comments regarding disconnects
360            self.step().await?;
361        }
362    }
363}
364
365impl Connection {
366    fn new(sender: Sender<transport::Full, mtp::Encrypted>, request_tx: Enqueuer) -> Self {
367        Self {
368            sender: AsyncMutex::new(sender),
369            request_tx: RwLock::new(request_tx),
370            step_counter: AtomicU32::new(0),
371        }
372    }
373
374    pub(crate) async fn invoke<R: tl::RemoteCall, F: Fn(Vec<tl::enums::Updates>)>(
375        &self,
376        request: &R,
377        flood_sleep_threshold: u32,
378        on_updates: F,
379    ) -> Result<R::Return, InvocationError> {
380        let mut slept_flood = false;
381
382        let mut rx = { self.request_tx.read().unwrap().enqueue(request) };
383        loop {
384            match rx.try_recv() {
385                Ok(response) => match response {
386                    Ok(body) => break R::Return::from_bytes(&body).map_err(|e| e.into()),
387                    Err(InvocationError::Rpc(RpcError {
388                        name,
389                        code: 420,
390                        value: Some(seconds),
391                        ..
392                    })) if !slept_flood && seconds <= flood_sleep_threshold => {
393                        let delay = std::time::Duration::from_secs(seconds as _);
394                        info!(
395                            "sleeping on {} for {:?} before retrying {}",
396                            name,
397                            delay,
398                            std::any::type_name::<R>()
399                        );
400                        tokio::time::sleep(delay).await;
401                        slept_flood = true;
402                        rx = self.request_tx.read().unwrap().enqueue(request);
403                        continue;
404                    }
405                    Err(e) => break Err(e),
406                },
407                Err(TryRecvError::Empty) => {
408                    on_updates(self.step().await?);
409                }
410                Err(TryRecvError::Closed) => {
411                    panic!("request channel dropped before receiving a result")
412                }
413            }
414        }
415    }
416
417    async fn step(&self) -> Result<Vec<tl::enums::Updates>, sender::ReadError> {
418        let ticket_number = self.step_counter.load(Ordering::SeqCst);
419        let mut sender = self.sender.lock().await;
420        match self.step_counter.compare_exchange(
421            ticket_number,
422            // As long as the counter's modulo is larger than the amount of concurrent tasks, we're fine.
423            ticket_number.wrapping_add(1),
424            Ordering::SeqCst,
425            Ordering::SeqCst,
426        ) {
427            Ok(_) => sender.step().await, // We're the one to drive IO.
428            Err(_) => Ok(Vec::new()),     // A different task drove IO.
429        }
430    }
431}