grammers-client 0.4.0

A high level client to interact with Telegram's API.
Documentation
// Copyright 2020 - developers of the `grammers` project.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use super::{Client, ClientInner, Config};
use crate::client::client::FileDownloader;
use crate::utils::{self, AsyncMutex, Mutex};
use grammers_mtproto::mtp::{self, RpcError};
use grammers_mtproto::transport;
use grammers_mtproto::transport::Full;
use grammers_mtsender::{self as sender, AuthorizationError, InvocationError, Sender};
use grammers_session::{ChatHashCache, MessageBox};
use grammers_tl_types::{self as tl, Deserializable};
use log::{debug, info};
use mtp::Encrypted;
use sender::Enqueuer;
use std::collections::{HashMap, VecDeque};
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{Notify, RwLock};

/// Socket addresses to Telegram datacenters, where the index into this array
/// represents the data center ID.
///
/// The addresses were obtained from the `static` addresses through a call to
/// `functions::help::GetConfig`.
const DC_ADDRESSES: [(Ipv4Addr, u16); 6] = [
    (Ipv4Addr::new(0, 0, 0, 0), 0),
    (Ipv4Addr::new(149, 154, 175, 53), 443),
    (Ipv4Addr::new(149, 154, 167, 51), 443),
    (Ipv4Addr::new(149, 154, 175, 100), 443),
    (Ipv4Addr::new(149, 154, 167, 92), 443),
    (Ipv4Addr::new(91, 108, 56, 190), 443),
];

const DEFAULT_DC: i32 = 2;

pub(crate) async fn connect_sender(
    dc_id: i32,
    config: &Config,
) -> Result<(Sender<transport::Full, mtp::Encrypted>, Enqueuer), AuthorizationError> {
    let transport = transport::Full::new();

    let addr: SocketAddr = if let Some(ip) = config.params.server_addr {
        ip
    } else {
        DC_ADDRESSES[dc_id as usize].into()
    };

    let (mut sender, request_tx) = if let Some(auth_key) = config.session.dc_auth_key(dc_id) {
        info!(
            "creating a new sender with existing auth key to dc {} {:?}",
            dc_id, addr
        );

        #[cfg(feature = "proxy")]
        if let Some(url) = config.params.proxy_url.as_ref() {
            sender::connect_via_proxy_with_auth(transport, addr, auth_key, url).await?
        } else {
            sender::connect_with_auth(transport, addr, auth_key).await?
        }

        #[cfg(not(feature = "proxy"))]
        sender::connect_with_auth(transport, addr, auth_key).await?
    } else {
        info!(
            "creating a new sender and auth key in dc {} {:?}",
            dc_id, addr
        );

        #[cfg(feature = "proxy")]
        let (sender, tx) = if let Some(url) = config.params.proxy_url.as_ref() {
            sender::connect_via_proxy(transport, addr, url).await?
        } else {
            sender::connect(transport, addr).await?
        };

        #[cfg(not(feature = "proxy"))]
        let (sender, tx) = sender::connect(transport, addr).await?;

        config.session.insert_dc(dc_id, addr, sender.auth_key());
        (sender, tx)
    };

    // TODO handle -404 (we had a previously-valid authkey, but server no longer knows about it)
    // TODO all up-to-date server addresses should be stored in the session for future initial connections
    let _remote_config = sender
        .invoke(&tl::functions::InvokeWithLayer {
            layer: tl::LAYER,
            query: tl::functions::InitConnection {
                api_id: config.api_id,
                device_model: config.params.device_model.clone(),
                system_version: config.params.system_version.clone(),
                app_version: config.params.app_version.clone(),
                system_lang_code: config.params.system_lang_code.clone(),
                lang_pack: "".into(),
                lang_code: config.params.lang_code.clone(),
                proxy: None,
                params: None,
                query: tl::functions::help::GetConfig {},
            },
        })
        .await?;

    Ok((sender, request_tx))
}

/// Method implementations directly related with network connectivity.
impl Client {
    /// Creates and returns a new client instance upon successful connection to Telegram.
    ///
    /// If the session in the configuration did not have an authorization key, a new one
    /// will be created and the session will be saved with it.
    ///
    /// The connection will be initialized with the data from the input configuration.
    ///
    /// # Examples
    ///
    /// ```
    /// use grammers_client::{Client, Config};
    /// use grammers_session::Session;
    ///
    /// // Note: these are example values and are not actually valid.
    /// //       Obtain your own with the developer's phone at https://my.telegram.org.
    /// const API_ID: i32 = 932939;
    /// const API_HASH: &str = "514727c32270b9eb8cc16daf17e21e57";
    ///
    /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = Client::connect(Config {
    ///     session: Session::load_file_or_create("hello-world.session")?,
    ///     api_id: API_ID,
    ///     api_hash: API_HASH.to_string(),
    ///     params: Default::default(),
    /// }).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn connect(mut config: Config) -> Result<Self, AuthorizationError> {
        let dc_id = config
            .session
            .get_user()
            .map(|u| u.dc)
            .unwrap_or(DEFAULT_DC);
        let (sender, request_tx) = connect_sender(dc_id, &config).await?;
        let message_box = if config.params.catch_up {
            if let Some(state) = config.session.get_state() {
                MessageBox::load(state)
            } else {
                MessageBox::new()
            }
        } else {
            // If the user doesn't want to bother with catching up on previous update, start with
            // pristine state instead.
            MessageBox::new()
        };

        // Pre-allocate the right `VecDeque` size if a limit is given.
        let updates = if let Some(limit) = config.params.update_queue_limit {
            VecDeque::with_capacity(limit)
        } else {
            VecDeque::new()
        };

        // "Remove" the limit to avoid checking for it (and avoid warning).
        if let Some(0) = config.params.update_queue_limit {
            config.params.update_queue_limit = None;
        }

        let self_user = config.session.get_user();

        // TODO Sender doesn't have a way to handle backpressure yet
        let client = Self(Arc::new(ClientInner {
            id: utils::generate_random_id(),
            sender: AsyncMutex::new("client.sender", sender),
            stepping_done: Notify::new(),
            dc_id: Mutex::new("client.dc_id", dc_id),
            config,
            message_box: Mutex::new("client.message_box", message_box),
            chat_hashes: Mutex::new(
                "client.chat_hashes",
                ChatHashCache::new(self_user.map(|u| (u.id, u.bot))),
            ),
            last_update_limit_warn: Mutex::new("client.last_update_limit_warn", None),
            updates: Mutex::new("client.updates", updates),
            request_tx: Mutex::new("client.request_tx", request_tx),
            downloader_map: RwLock::new(HashMap::new()),
        }));

        // Don't bother getting pristine state if we're not logged in.
        if client
            .0
            .message_box
            .lock("client.connect.is_empty")
            .is_empty()
            && client.0.config.session.signed_in()
        {
            match client.invoke(&tl::functions::updates::GetState {}).await {
                Ok(state) => {
                    client
                        .0
                        .message_box
                        .lock("client.connect.set_state")
                        .set_state(state);
                    client.sync_update_state();
                }
                Err(_err) => {
                    // The account may no longer actually be logged in, or it can rarely fail.
                    // `message_box` will try to correct its state as updates arrive.
                }
            }
        }

        Ok(client)
    }

    /// Invoke a raw API call. This directly sends the request to Telegram's servers.
    ///
    /// Using function definitions corresponding to a different layer is likely to cause the
    /// responses to the request to not be understood.
    ///
    /// <div class="stab unstable">
    ///
    /// **Warning**: this method is **not** part of the stability guarantees of semantic
    /// versioning. It **may** break during *minor* version changes (but not on patch version
    /// changes). Use with care.
    ///
    /// </div>
    ///
    /// # Examples
    ///
    /// ```
    /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
    /// use grammers_tl_types as tl;
    ///
    /// dbg!(client.invoke(&tl::functions::Ping { ping_id: 0 }).await?);
    /// # Ok(())
    /// # }
    /// ```
    pub async fn invoke<R: tl::RemoteCall>(
        &self,
        request: &R,
    ) -> Result<R::Return, InvocationError> {
        return self
            .invoke_request(
                request,
                &self.0.request_tx,
                &self.0.sender,
                &self.0.stepping_done,
            )
            .await;
    }

    pub(crate) async fn invoke_request<R: tl::RemoteCall>(
        &self,
        request: &R,
        enqueuer: &Mutex<Enqueuer>,
        sender: &AsyncMutex<Sender<Full, Encrypted>>,
        notifier: &Notify,
    ) -> Result<R::Return, InvocationError> {
        let mut slept_flood = false;
        let sleep_thresh = self.0.config.params.flood_sleep_threshold.unwrap_or(0);

        let mut rx = enqueuer.lock("invoke").enqueue(request);
        loop {
            match rx.try_recv() {
                Ok(response) => match response {
                    Ok(body) => break R::Return::from_bytes(&body).map_err(|e| e.into()),
                    Err(InvocationError::Rpc(RpcError {
                        name,
                        code: 420,
                        value: Some(seconds),
                        ..
                    })) if !slept_flood && seconds <= sleep_thresh => {
                        let delay = std::time::Duration::from_secs(seconds as _);
                        info!(
                            "sleeping on {} for {:?} before retrying {}",
                            name,
                            delay,
                            std::any::type_name::<R>()
                        );
                        tokio::time::sleep(delay).await;
                        slept_flood = true;
                        rx = enqueuer.lock("invoke").enqueue(request);
                        continue;
                    }
                    Err(e) => break Err(e),
                },
                Err(TryRecvError::Empty) => {
                    self.step_sender(sender, notifier).await?;
                }
                Err(TryRecvError::Closed) => {
                    panic!("request channel dropped before receiving a result")
                }
            }
        }
    }

    async fn export_authorization(
        &self,
        target_dc_id: i32,
    ) -> Result<tl::types::auth::ExportedAuthorization, InvocationError> {
        let request = tl::functions::auth::ExportAuthorization {
            dc_id: target_dc_id,
        };
        return match self.invoke(&request).await {
            Ok(tl::enums::auth::ExportedAuthorization::Authorization(exported_auth)) => {
                Ok(exported_auth)
            }
            Err(e) => Err(e),
        };
    }

    async fn connect_sender(&self, dc_id: i32) -> Result<Arc<FileDownloader>, InvocationError> {
        let mut mutex = self.0.downloader_map.write().await;
        debug!("Connecting new datacenter {}", dc_id);
        return match connect_sender(dc_id, &self.0.config).await {
            Ok((new_sender, new_tx)) => {
                let new_downloader = Arc::new(FileDownloader {
                    sender: AsyncMutex::new("dc_sender", new_sender),
                    request_tx: Mutex::new("dc_enqueuer", new_tx),
                    stepping_done: Notify::new(),
                });

                // export auth
                let authorization = self.export_authorization(dc_id).await?;

                // import into new sender
                let request = tl::functions::auth::ImportAuthorization {
                    id: authorization.id,
                    bytes: authorization.bytes,
                };
                self.invoke_request(
                    &request,
                    &new_downloader.request_tx,
                    &new_downloader.sender,
                    &new_downloader.stepping_done,
                )
                .await?;

                mutex.insert(dc_id, new_downloader.clone());
                Ok(new_downloader.clone())
            }
            Err(e) => panic!("Cannot obtain new sender to datacenter {}", e),
        };
    }

    async fn get_downloader(
        &self,
        dc_id: i32,
    ) -> Result<Option<Arc<FileDownloader>>, InvocationError> {
        return Ok({
            let guard = self.0.downloader_map.read().await;
            guard.get(&dc_id).map(|f| f.clone())
        });
    }

    pub async fn invoke_in_dc<R: tl::RemoteCall>(
        &self,
        request: &R,
        dc_id: i32,
    ) -> Result<R::Return, InvocationError> {
        let downloader = match self.get_downloader(dc_id).await? {
            None => self.connect_sender(dc_id).await?,
            Some(fd) => fd,
        };
        return self
            .invoke_request(
                request,
                &downloader.request_tx,
                &downloader.sender,
                &downloader.stepping_done,
            )
            .await;
    }

    /// Perform a single network step.
    ///
    /// Most commonly, you will want to use the higher-level abstraction [`Client::next_update`]
    /// instead.
    ///
    /// # Examples
    ///
    /// ```
    /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
    /// loop {
    ///     // Process network events forever until we gracefully disconnect or get an error.
    ///     client.step().await?;
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub async fn step(&self) -> Result<(), sender::ReadError> {
        return self
            .step_sender(&self.0.sender, &self.0.stepping_done)
            .await;
    }
    async fn step_sender(
        &self,
        sender: &AsyncMutex<Sender<Full, Encrypted>>,
        notifier: &Notify,
    ) -> Result<(), sender::ReadError> {
        match sender.try_lock("client.step") {
            Ok(mut sender) => {
                // Sender was unlocked, we're the ones that will perform the network step.
                let updates = sender.step().await?;
                notifier.notify_waiters();
                self.process_socket_updates(updates);

                // TODO request cancellation if this is Err
                // (perhaps a method on the sender to cancel_all)
                Ok(())
            }
            Err(_) => {
                // Someone else is already performing the network step. Wait for the step to
                // complete and return immediately without stepping again. The caller wants
                // *one* step to complete, but it doesn't care *who* completes it.
                notifier.notified().await;
                Ok(())
            }
        }
    }

    /// Run the client by repeatedly calling [`Client::step`] until a graceful disconnection
    /// occurs, or a network error occurs. Incoming updates are ignored and simply dropped.
    /// instead.
    ///
    /// # Examples
    ///
    /// ```
    /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
    /// client.run_until_disconnected().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn run_until_disconnected(self) -> Result<(), sender::ReadError> {
        loop {
            // TODO review doc comments regarding disconnects
            self.step().await?;
        }
    }
}