telegram_bot/api.rs
1use std::sync::{
2 atomic::{AtomicUsize, Ordering},
3 Arc,
4};
5use std::time::Duration;
6
7use futures::{Future, FutureExt};
8use tokio::time::timeout;
9use tracing_futures::Instrument;
10
11use telegram_bot_raw::{HttpRequest, Request, ResponseType};
12
13use crate::connector::{default_connector, Connector};
14use crate::errors::{Error, ErrorKind};
15use crate::stream::UpdatesStream;
16
17/// Main type for sending requests to the Telegram bot API.
18#[derive(Clone)]
19pub struct Api(Arc<ApiInner>);
20
21struct ApiInner {
22 token: String,
23 connector: Box<dyn Connector>,
24 next_request_id: AtomicUsize,
25}
26
27impl Api {
28 /// Create a new `Api` instance.
29 ///
30 /// # Example
31 ///
32 /// Using default connector.
33 ///
34 /// ```rust
35 /// use telegram_bot::Api;
36 ///
37 /// # fn main() {
38 /// # let telegram_token = "token";
39 /// let api = Api::new(telegram_token);
40 /// # }
41 /// ```
42 pub fn new<T: AsRef<str>>(token: T) -> Self {
43 Self::with_connector(token, default_connector())
44 }
45
46 /// Create a new `Api` instance wtih custom connector.
47 pub fn with_connector<T: AsRef<str>>(token: T, connector: Box<dyn Connector>) -> Self {
48 Api(Arc::new(ApiInner {
49 token: token.as_ref().to_string(),
50 connector,
51 next_request_id: AtomicUsize::new(0),
52 }))
53 }
54
55 /// Create a stream which produces updates from the Telegram server.
56 ///
57 /// # Examples
58 ///
59 /// ```rust
60 /// # use telegram_bot::Api;
61 /// use futures::StreamExt;
62 ///
63 /// # #[tokio::main]
64 /// # async fn main() {
65 /// # let api: Api = Api::new("token");
66 ///
67 /// let mut stream = api.stream();
68 /// let update = stream.next().await;
69 /// println!("{:?}", update);
70 /// # }
71 /// ```
72 pub fn stream(&self) -> UpdatesStream {
73 UpdatesStream::new(&self)
74 }
75
76 /// Send a request to the Telegram server and do not wait for a response.
77 ///
78 /// # Examples
79 ///
80 /// ```rust
81 /// # use telegram_bot::{Api, ChatId, prelude::*};
82 /// # use std::time::Duration;
83 /// #
84 /// # #[tokio::main]
85 /// # async fn main() {
86 /// # let telegram_token = "token";
87 /// # let api = Api::new(telegram_token);
88 /// # if false {
89 /// let chat = ChatId::new(61031);
90 /// api.spawn(chat.text("Message"));
91 /// # }
92 /// # }
93 /// ```
94 pub fn spawn<Req: Request>(&self, request: Req) {
95 let api = self.clone();
96 if let Ok(request) = request.serialize() {
97 tokio::spawn(async move {
98 let _ = api.send_http_request::<Req::Response>(request).await;
99 });
100 }
101 }
102
103 /// Send a request to the Telegram server and wait for a response, timing out after `duration`.
104 /// Future will resolve to `None` if timeout fired.
105 ///
106 /// # Examples
107 ///
108 /// ```rust
109 /// # use telegram_bot::{Api, GetMe};
110 /// # use std::time::Duration;
111 /// #
112 /// # #[tokio::main]
113 /// # async fn main() {
114 /// # let telegram_token = "token";
115 /// # let api = Api::new(telegram_token);
116 /// # if false {
117 /// let result = api.send_timeout(GetMe, Duration::from_secs(2)).await;
118 /// println!("{:?}", result);
119 /// # }
120 /// # }
121 /// ```
122 pub fn send_timeout<Req: Request>(
123 &self,
124 request: Req,
125 duration: Duration,
126 ) -> impl Future<Output = Result<Option<<Req::Response as ResponseType>::Type>, Error>> + Send
127 {
128 let api = self.clone();
129 let request = request.serialize();
130 async move {
131 match timeout(
132 duration,
133 api.send_http_request::<Req::Response>(request.map_err(ErrorKind::from)?),
134 )
135 .await
136 {
137 Err(_) => Ok(None),
138 Ok(Ok(result)) => Ok(Some(result)),
139 Ok(Err(error)) => Err(error),
140 }
141 }
142 }
143
144 /// Send a request to the Telegram server and wait for a response.
145 ///
146 /// # Examples
147 ///
148 /// ```rust
149 /// # use telegram_bot::{Api, GetMe};
150 /// #
151 /// # #[tokio::main]
152 /// # async fn main() {
153 /// # let telegram_token = "token";
154 /// # let api = Api::new(telegram_token);
155 /// # if false {
156 /// let result = api.send(GetMe).await;
157 /// println!("{:?}", result);
158 /// # }
159 /// # }
160 /// ```
161 pub fn send<Req: Request>(
162 &self,
163 request: Req,
164 ) -> impl Future<Output = Result<<Req::Response as ResponseType>::Type, Error>> + Send {
165 let api = self.clone();
166 let request = request.serialize();
167 async move {
168 api.send_http_request::<Req::Response>(request.map_err(ErrorKind::from)?)
169 .await
170 }
171 }
172
173 async fn send_http_request<Resp: ResponseType>(
174 &self,
175 request: HttpRequest,
176 ) -> Result<Resp::Type, Error> {
177 let request_id = self.0.next_request_id.fetch_add(1, Ordering::Relaxed);
178 let span = tracing::trace_span!("send_http_request", request_id = request_id);
179 async {
180 tracing::trace!(name = %request.name(), body = %request.body, "sending request");
181 let http_response = self.0.connector.request(&self.0.token, request).await?;
182 tracing::trace!(
183 response = %match http_response.body {
184 Some(ref vec) => match std::str::from_utf8(vec) {
185 Ok(str) => str,
186 Err(_) => "<invalid utf-8 string>"
187 },
188 None => "<empty body>",
189 }, "response received"
190 );
191
192 let response = Resp::deserialize(http_response).map_err(ErrorKind::from)?;
193 tracing::trace!("response deserialized");
194 Ok(response)
195 }
196 .map(|result| {
197 if let Err(ref error) = result {
198 tracing::error!(error = %error);
199 }
200 result
201 })
202 .instrument(span)
203 .await
204 }
205}