1use std::sync::Arc;
2use hyper::Client;
3use hyper_tls::HttpsConnector;
4use hyper::client::HttpConnector;
5use hyper::Body;
6use error::*;
7use hyper::rt::{Future, Stream};
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use hyper::Request;
11use serde_json;
12use responses::*;
13use requests::*;
14use std::str;
15use stream::UpdatesStream;
16use std::collections::VecDeque;
17use std::time::Duration;
18use std::time::Instant;
19use tokio::timer::Delay;
20use try_from::TryFrom;
21use std::cmp::max;
22
23const BASE_API_URI: &'static str = "https://api.telegram.org/bot";
24const GET_FILE_URI: &'static str = "https://api.telegram.org/file/bot";
25
26pub struct BotApiClient {
27 http_client: Arc<Client<HttpsConnector<HttpConnector>, Body>>,
28 token: Arc<String>,
29}
30
31impl Clone for BotApiClient {
32 fn clone(&self) -> Self {
33 BotApiClient {
34 http_client: Arc::clone(&self.http_client),
35 token: Arc::clone(&self.token),
36 }
37 }
38}
39
40pub enum HttpClient {
41 Default,
42 Owned(Client<HttpsConnector<HttpConnector>, Body>),
43 Arc(Arc<Client<HttpsConnector<HttpConnector>, Body>>),
44}
45
46impl BotApiClient {
47 pub fn new<S: Into<String>>(http_client: HttpClient, token: S) -> BotApiClient {
48 let http_client =
49 match http_client {
50 HttpClient::Default => {
51 let https = HttpsConnector::new(1).expect("TLS initialization failed");
52 Arc::new(Client::builder().build::<_, Body>(https))
53 }
54 HttpClient::Owned(http_client) => {
55 Arc::new(http_client)
56 }
57 HttpClient::Arc(http_client) => {
58 http_client
59 }
60 };
61 BotApiClient {
62 http_client,
63 token: Arc::new(token.into()),
64 }
65 }
66
67 pub fn incoming_updates(&self, mut request: GetUpdatesRequest) -> impl Stream<Item=Update, Error=Error> {
68 let cloned_self = self.clone();
69 let first_request = cloned_self.get_updates(&request);
70 let offset = request.offset;
71 let send_request = move |x| {
72 request.offset = x;
73 cloned_self.get_updates(&request)
74 };
75 UpdatesStream {
76 bot_api_client: send_request,
77 buffer: VecDeque::new(),
78 executing_request: first_request,
79 is_canceled: false,
80 last_id: offset,
81 has_error: false
82 }
83 }
84
85 pub fn download_file(&self, request: &GetFileRequest, timeout: Duration) -> impl Future<Item=Vec<u8>, Error=Error> {
86 let cloned_self = self.clone();
87 let download_future =
88 self.get_file(request, timeout)
89 .then(|file| {
90 match file? {
91 File { file_path: Some(path), .. } =>
92 Ok(path),
93 _ =>
94 Err(Error::Unknown(String::from("File not found")))
95 }
96 })
97 .and_then(move |file_path| {
98 let uri = format!("{}{}/{}", GET_FILE_URI, cloned_self.token, file_path).parse().expect("Error has occurred while creating get_file uri");
99 cloned_self.http_client.get(uri)
100 .and_then(|response| {
101 response
102 .into_body()
103 .concat2()
104 })
105 .map(|x| x.to_vec())
106 .map_err(From::from)
107 });
108 BotApiClient::with_timeout(download_future, timeout)
109 }
110
111 pub fn send_message(&self, request: &SendMessageRequest, timeout: Duration) -> impl Future<Item=Message, Error=Error> {
112 self.send_request(request, <Message as TryFrom<raw::message::Message>>::try_from, timeout)
113 }
114
115 pub fn send_chat_action(&self, request: &SendChatAction, timeout: Duration) -> impl Future<Item=bool, Error=Error> {
116 fn id(val: bool) -> Result<bool, UnexpectedResponse> {
117 Ok(val)
118 }
119 self.send_request(request, id, timeout)
120 }
121
122 pub fn answer_callback_query(&self, request: &AnswerCallbackQuery, timeout: Duration) -> impl Future<Item=bool, Error=Error> {
123 fn id(val: bool) -> Result<bool, UnexpectedResponse> {
124 Ok(val)
125 }
126 self.send_request(request, id, timeout)
127 }
128
129 pub fn send_media_group(&self, request: &SendMediaGroupRequest, timeout: Duration) -> impl Future<Item=Vec<Message>, Error=Error> {
130 fn map(x: Vec<raw::message::Message>) -> Result<Vec<Message>, UnexpectedResponse> {
131 x.into_iter().map(TryFrom::try_from).collect()
132 }
133 self.send_request(request, map, timeout)
134 }
135
136 pub fn get_me(&self, timeout: Duration) -> impl Future<Item=User, Error=Error> {
137 self.send_request(&GetMe, Ok, timeout)
138 }
139
140 pub fn get_file(&self, request: &GetFileRequest, timeout: Duration) -> impl Future<Item=File, Error=Error> {
141 self.send_request(request, Ok, timeout)
142 }
143
144 pub fn get_updates(&self, request: &GetUpdatesRequest) -> impl Future<Item=Vec<Update>, Error=Error> {
145 fn map(x: Vec<raw::update::Update>) -> Result<Vec<Update>, UnexpectedResponse> {
146 x.into_iter()
147 .map(TryFrom::try_from)
148 .collect::<Result<Vec<Update>, UnexpectedResponse>>()
149 }
150
151 let timeout = Duration::from_secs(request.timeout.map(|x| max(x * 2, 10)).unwrap_or(10) as u64);
152
153 self.send_request(request, map, timeout)
154 }
155
156 fn send_request<TRequest, TResult, TMappedResult>(&self, request: &TRequest, result_map: fn(TResult) -> Result<TMappedResult, UnexpectedResponse>, timeout: Duration) -> impl Future<Item=TMappedResult, Error=Error>
157 where TRequest: Serialize + ::requests::Request,
158 TResult: DeserializeOwned,
159 {
160 let uri = format!("{}{}/{}", BASE_API_URI, self.token, request.method());
161 let request =
162 Request::post(uri)
163 .header("content-type", "application/json")
164 .body(Body::from(serde_json::to_string(request).expect("Error while serializing request")))
165 .expect("While creating request an error has occurred");
166
167
168 let api_request = self.http_client.request(request)
169 .and_then(|r| r.into_body().concat2())
170 .then(move |body| {
171 let body_ref = &body?;
172 let response: raw::TgResponse<TResult> = serde_json::from_slice(body_ref)?;
173 match response {
174 raw::TgResponse { ok: true, result: Some(res), .. } =>
175 result_map(res)
176 .map_err(|err|
177 str::from_utf8(body_ref)
178 .map(|x| Error::UnexpectedResponse { raw_response: String::from(x), kind: err })
179 .unwrap_or(Error::Unknown(String::from("Error while converting tg response to utf8 string")))),
180
181 raw::TgResponse { ok: false, description: Some(description), error_code: Some(error_code), .. } =>
182 Err(Error::TelegramApi { error_code, description }),
183
184 _ =>
185 Err(str::from_utf8(body_ref)
186 .map(|x| Error::Unknown(String::from(x)))
187 .unwrap_or(Error::Unknown(String::from("Error while converting tg response to utf8 string"))))
188 }
189 });
190 BotApiClient::with_timeout(api_request, timeout)
191 }
192
193 fn with_timeout<T>(fut: impl Future<Item=T, Error=Error>, timeout: Duration) -> impl Future<Item=T, Error=Error> {
194 let when = Instant::now() + timeout;
195 let delay = Delay::new(when)
196 .then(move |x| {
197 match x {
198 Ok(_) =>
199 Err(Error::TimedOut(timeout)),
200 Err(e) =>
201 Err(From::from(e))
202 }
203 });
204 fut.select(delay).then(|x| {
205 match x {
206 Ok((resp, _)) =>
207 Ok(resp),
208 Err((err, _)) =>
209 Err(err)
210 }
211 })
212 }
213}