relegram/
api.rs

1use std::sync::Arc;
2use hyper::Client;
3use hyper_tls::HttpsConnector;
4use hyper::client::HttpConnector;
5use hyper::Body;
6use error::*;
7use hyper::rt::{Future, Stream};
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use hyper::Request;
11use serde_json;
12use responses::*;
13use requests::*;
14use std::str;
15use stream::UpdatesStream;
16use std::collections::VecDeque;
17use std::time::Duration;
18use std::time::Instant;
19use tokio::timer::Delay;
20use try_from::TryFrom;
21use std::cmp::max;
22
23const BASE_API_URI: &'static str = "https://api.telegram.org/bot";
24const GET_FILE_URI: &'static str = "https://api.telegram.org/file/bot";
25
26pub struct BotApiClient {
27    http_client: Arc<Client<HttpsConnector<HttpConnector>, Body>>,
28    token: Arc<String>,
29}
30
31impl Clone for BotApiClient {
32    fn clone(&self) -> Self {
33        BotApiClient {
34            http_client: Arc::clone(&self.http_client),
35            token: Arc::clone(&self.token),
36        }
37    }
38}
39
40pub enum HttpClient {
41    Default,
42    Owned(Client<HttpsConnector<HttpConnector>, Body>),
43    Arc(Arc<Client<HttpsConnector<HttpConnector>, Body>>),
44}
45
46impl BotApiClient {
47    pub fn new<S: Into<String>>(http_client: HttpClient, token: S) -> BotApiClient {
48        let http_client =
49            match http_client {
50                HttpClient::Default => {
51                    let https = HttpsConnector::new(1).expect("TLS initialization failed");
52                    Arc::new(Client::builder().build::<_, Body>(https))
53                }
54                HttpClient::Owned(http_client) => {
55                    Arc::new(http_client)
56                }
57                HttpClient::Arc(http_client) => {
58                    http_client
59                }
60            };
61        BotApiClient {
62            http_client,
63            token: Arc::new(token.into()),
64        }
65    }
66
67    pub fn incoming_updates(&self, mut request: GetUpdatesRequest) -> impl Stream<Item=Update, Error=Error> {
68        let cloned_self = self.clone();
69        let first_request = cloned_self.get_updates(&request);
70        let offset = request.offset;
71        let send_request = move |x| {
72            request.offset = x;
73            cloned_self.get_updates(&request)
74        };
75        UpdatesStream {
76            bot_api_client: send_request,
77            buffer: VecDeque::new(),
78            executing_request: first_request,
79            is_canceled: false,
80            last_id: offset,
81            has_error: false
82        }
83    }
84
85    pub fn download_file(&self, request: &GetFileRequest, timeout: Duration) -> impl Future<Item=Vec<u8>, Error=Error> {
86        let cloned_self = self.clone();
87        let download_future =
88            self.get_file(request, timeout)
89                .then(|file| {
90                    match file? {
91                        File { file_path: Some(path), .. } =>
92                            Ok(path),
93                        _ =>
94                            Err(Error::Unknown(String::from("File not found")))
95                    }
96                })
97                .and_then(move |file_path| {
98                    let uri = format!("{}{}/{}", GET_FILE_URI, cloned_self.token, file_path).parse().expect("Error has occurred while creating get_file uri");
99                    cloned_self.http_client.get(uri)
100                        .and_then(|response| {
101                            response
102                                .into_body()
103                                .concat2()
104                        })
105                        .map(|x| x.to_vec())
106                        .map_err(From::from)
107                });
108        BotApiClient::with_timeout(download_future, timeout)
109    }
110
111    pub fn send_message(&self, request: &SendMessageRequest, timeout: Duration) -> impl Future<Item=Message, Error=Error> {
112        self.send_request(request, <Message as TryFrom<raw::message::Message>>::try_from, timeout)
113    }
114
115    pub fn send_chat_action(&self, request: &SendChatAction, timeout: Duration) -> impl Future<Item=bool, Error=Error> {
116        fn id(val: bool) -> Result<bool, UnexpectedResponse> {
117            Ok(val)
118        }
119        self.send_request(request, id, timeout)
120    }
121
122    pub fn answer_callback_query(&self, request: &AnswerCallbackQuery, timeout: Duration) -> impl Future<Item=bool, Error=Error> {
123        fn id(val: bool) -> Result<bool, UnexpectedResponse> {
124            Ok(val)
125        }
126        self.send_request(request, id, timeout)
127    }
128
129    pub fn send_media_group(&self, request: &SendMediaGroupRequest, timeout: Duration) -> impl Future<Item=Vec<Message>, Error=Error> {
130        fn map(x: Vec<raw::message::Message>) -> Result<Vec<Message>, UnexpectedResponse> {
131            x.into_iter().map(TryFrom::try_from).collect()
132        }
133        self.send_request(request, map, timeout)
134    }
135
136    pub fn get_me(&self, timeout: Duration) -> impl Future<Item=User, Error=Error> {
137        self.send_request(&GetMe, Ok, timeout)
138    }
139
140    pub fn get_file(&self, request: &GetFileRequest, timeout: Duration) -> impl Future<Item=File, Error=Error> {
141        self.send_request(request, Ok, timeout)
142    }
143
144    pub fn get_updates(&self, request: &GetUpdatesRequest) -> impl Future<Item=Vec<Update>, Error=Error> {
145        fn map(x: Vec<raw::update::Update>) -> Result<Vec<Update>, UnexpectedResponse> {
146            x.into_iter()
147                .map(TryFrom::try_from)
148                .collect::<Result<Vec<Update>, UnexpectedResponse>>()
149        }
150
151        let timeout = Duration::from_secs(request.timeout.map(|x| max(x * 2, 10)).unwrap_or(10) as u64);
152
153        self.send_request(request, map, timeout)
154    }
155
156    fn send_request<TRequest, TResult, TMappedResult>(&self, request: &TRequest, result_map: fn(TResult) -> Result<TMappedResult, UnexpectedResponse>, timeout: Duration) -> impl Future<Item=TMappedResult, Error=Error>
157        where TRequest: Serialize + ::requests::Request,
158              TResult: DeserializeOwned,
159    {
160        let uri = format!("{}{}/{}", BASE_API_URI, self.token, request.method());
161        let request =
162            Request::post(uri)
163                .header("content-type", "application/json")
164                .body(Body::from(serde_json::to_string(request).expect("Error while serializing request")))
165                .expect("While creating request an error has occurred");
166
167
168        let api_request = self.http_client.request(request)
169            .and_then(|r| r.into_body().concat2())
170            .then(move |body| {
171                let body_ref = &body?;
172                let response: raw::TgResponse<TResult> = serde_json::from_slice(body_ref)?;
173                match response {
174                    raw::TgResponse { ok: true, result: Some(res), .. } =>
175                        result_map(res)
176                            .map_err(|err|
177                                str::from_utf8(body_ref)
178                                    .map(|x| Error::UnexpectedResponse { raw_response: String::from(x), kind: err })
179                                    .unwrap_or(Error::Unknown(String::from("Error while converting tg response to utf8 string")))),
180
181                    raw::TgResponse { ok: false, description: Some(description), error_code: Some(error_code), .. } =>
182                        Err(Error::TelegramApi { error_code, description }),
183
184                    _ =>
185                        Err(str::from_utf8(body_ref)
186                            .map(|x| Error::Unknown(String::from(x)))
187                            .unwrap_or(Error::Unknown(String::from("Error while converting tg response to utf8 string"))))
188                }
189            });
190        BotApiClient::with_timeout(api_request, timeout)
191    }
192
193    fn with_timeout<T>(fut: impl Future<Item=T, Error=Error>, timeout: Duration) -> impl Future<Item=T, Error=Error> {
194        let when = Instant::now() + timeout;
195        let delay = Delay::new(when)
196            .then(move |x| {
197                match x {
198                    Ok(_) =>
199                        Err(Error::TimedOut(timeout)),
200                    Err(e) =>
201                        Err(From::from(e))
202                }
203            });
204        fut.select(delay).then(|x| {
205            match x {
206                Ok((resp, _)) =>
207                    Ok(resp),
208                Err((err, _)) =>
209                    Err(err)
210            }
211        })
212    }
213}