twitter_stream/builder.rs
1//! A [`Builder`] type for [`TwitterStream`](crate::TwitterStream).
2//!
3//! The Streaming API has two different endpoints: [`POST statuses/filter`] and
4//! [`GET statuses/sample`]. `Builder` automatically determines which endpoint to use based on the
5//! specified parameters. Specifically, when any of [`follow`][Builder::follow],
6//! [`track`][Builder::track] and [`locations`][Builder::locations] parameters is specified,
7//! `filter` will be used, and when none is specified, `sample` will be used.
8//!
9//! [`POST statuses/filter`]: https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter
10//! [`GET statuses/sample`]: https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter
11//!
12//! `filter` yields public Tweets that match the filter predicates specified by the parameters,
13//! and `sample` yields "a small random sample" of all public Tweets.
14//!
15//! ## Example
16//!
17//! ```rust,no_run
18//! use futures::prelude::*;
19//! use twitter_stream::{builder::BoundingBox, Token};
20//!
21//! # #[tokio::main]
22//! # async fn main() {
23//! let token = Token::from_parts("consumer_key", "consumer_secret", "access_key", "access_secret");
24//!
25//! const TOKYO: &'static [BoundingBox] = &[BoundingBox::new(139.56, 35.53, 139.92, 35.82)];
26//!
27//! // Prints geolocated English Tweets associated with Tokyo (the 23 special wards).
28//! twitter_stream::Builder::new(token)
29//! .locations(TOKYO)
30//! .language("en")
31//! .listen()
32//! .try_flatten_stream()
33//! .try_for_each(|json| {
34//! println!("{}", json);
35//! future::ok(())
36//! })
37//! .await
38//! .unwrap();
39//! # }
40//! ```
41
42mod bounding_box;
43
44pub use http::Method as RequestMethod;
45pub use http::Uri;
46
47pub use bounding_box::BoundingBox;
48
49use std::borrow::Cow;
50use std::fmt::{self, Formatter};
51
52use http::header::{HeaderValue, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE};
53use http::Request;
54use slice_of_array::SliceFlatExt;
55
56use crate::service::HttpService;
57use crate::util::fmt_join;
58use crate::{FutureTwitterStream, Token};
59
60/// A builder for [`TwitterStream`](crate::TwitterStream).
61///
62/// See the [`builder`][crate::builder] module documentation for details.
63#[derive(Clone, Debug)]
64pub struct Builder<'a, T = Token> {
65 token: T,
66 endpoint: Option<(RequestMethod, Uri)>,
67 parameters: Parameters<'a>,
68}
69
70/// Parameters to the Streaming API.
71#[derive(Clone, Debug, Default, oauth::Request)]
72struct Parameters<'a> {
73 #[oauth1(skip_if = not)]
74 stall_warnings: bool,
75 filter_level: Option<FilterLevel>,
76 #[oauth1(skip_if = str::is_empty)]
77 language: Cow<'a, str>,
78 #[oauth1(encoded, fmt = fmt_follow, skip_if = <[_]>::is_empty)]
79 follow: Cow<'a, [u64]>,
80 #[oauth1(skip_if = str::is_empty)]
81 track: Cow<'a, str>,
82 #[oauth1(encoded, fmt = fmt_locations, skip_if = <[_]>::is_empty)]
83 #[allow(clippy::type_complexity)]
84 locations: Cow<'a, [BoundingBox]>,
85 #[oauth1(encoded)]
86 count: Option<i32>,
87}
88
89str_enum! {
90 /// Represents the [`filter_level`] parameter in API requests.
91 ///
92 /// [`filter_level`]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#filter-level
93 #[derive(Clone, Debug, PartialEq, Hash, Eq)]
94 pub enum FilterLevel {
95 /// `"none"`
96 None = "none",
97 /// `"low"`
98 Low = "low",
99 /// `"medium"`
100 Medium = "medium",
101 }
102}
103
104const FILTER: &str = "https://stream.twitter.com/1.1/statuses/filter.json";
105const SAMPLE: &str = "https://stream.twitter.com/1.1/statuses/sample.json";
106
107impl<'a, C, A> Builder<'a, Token<C, A>>
108where
109 C: AsRef<str>,
110 A: AsRef<str>,
111{
112 /// Creates a builder.
113 pub fn new(token: Token<C, A>) -> Self {
114 Builder {
115 token,
116 endpoint: None,
117 parameters: Parameters::default(),
118 }
119 }
120
121 /// Start listening on the Streaming API endpoint, returning a `Future` which resolves
122 /// to a `Stream` yielding JSON messages from the API.
123 ///
124 /// # Panics
125 ///
126 /// This will panic if the underlying HTTPS connector failed to initialize.
127 #[cfg(feature = "hyper")]
128 #[cfg_attr(docsrs, doc(cfg(feature = "hyper")))]
129 pub fn listen(&self) -> crate::hyper::FutureTwitterStream {
130 let conn = hyper_tls::HttpsConnector::new();
131 self.listen_with_client(hyper_pkg::Client::builder().build::<_, hyper_pkg::Body>(conn))
132 }
133
134 /// Same as [`listen`](Builder::listen) except that it uses `client` to make HTTP request
135 /// to the endpoint.
136 ///
137 /// `client` must be able to handle the `https` scheme.
138 ///
139 /// # Panics
140 ///
141 /// This will call `<S as Service>::call` without checking for `<S as Service>::poll_ready`
142 /// and may cause a panic if `client` is not ready to send an HTTP request yet.
143 ///
144 /// # Example
145 ///
146 /// ```no_run
147 /// use tower::ServiceExt;
148 ///
149 /// # async fn doc() -> hyper_pkg::Result<()> {
150 /// # let mut client = hyper_pkg::Client::new();
151 /// # let token = twitter_stream::Token::from_parts("", "", "", "");
152 /// let stream = twitter_stream::Builder::new(token)
153 /// .listen_with_client(client.ready_and().await?)
154 /// .await
155 /// .unwrap();
156 /// # Ok(())
157 /// # }
158 /// ```
159 pub fn listen_with_client<S, B>(&self, mut client: S) -> FutureTwitterStream<S::Future>
160 where
161 S: HttpService<B>,
162 B: From<Vec<u8>>,
163 {
164 let req = prepare_request(
165 self.endpoint.as_ref(),
166 self.token.as_ref(),
167 &self.parameters,
168 );
169 let response = client.call(req.map(Into::into));
170
171 FutureTwitterStream { response }
172 }
173}
174
175impl<'a, C, A> Builder<'a, Token<C, A>> {
176 /// Set the API endpoint URI to be connected.
177 ///
178 /// This overrides the default behavior of automatically determining the endpoint to use.
179 pub fn endpoint(&mut self, endpoint: impl Into<Option<(RequestMethod, Uri)>>) -> &mut Self {
180 self.endpoint = endpoint.into();
181 self
182 }
183
184 /// Reset the token to be used to log into Twitter.
185 pub fn token(&mut self, token: Token<C, A>) -> &mut Self {
186 self.token = token;
187 self
188 }
189
190 /// Set whether to receive messages when in danger of being disconnected.
191 ///
192 /// See the [Twitter Developer Documentation][1] for more information.
193 ///
194 /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#stall-warnings
195 pub fn stall_warnings(&mut self, stall_warnings: bool) -> &mut Self {
196 self.parameters.stall_warnings = stall_warnings;
197 self
198 }
199
200 /// Set the minimum `filter_level` Tweet attribute to receive.
201 /// The default is `FilterLevel::None`.
202 ///
203 /// See the [Twitter Developer Documentation][1] for more information.
204 ///
205 /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#filter-level
206 pub fn filter_level(&mut self, filter_level: impl Into<Option<FilterLevel>>) -> &mut Self {
207 self.parameters.filter_level = filter_level.into();
208 self
209 }
210
211 /// Set a comma-separated language identifiers to receive Tweets
212 /// written in the specified languages only.
213 ///
214 /// Setting an empty string will unset this parameter.
215 ///
216 /// See the [Twitter Developer Documentation][1] for more information.
217 ///
218 /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#language
219 pub fn language(&mut self, language: impl Into<Cow<'a, str>>) -> &mut Self {
220 self.parameters.language = language.into();
221 self
222 }
223
224 /// Set a list of user IDs to receive Tweets from the specified users.
225 ///
226 /// Setting an empty slice will unset this parameter.
227 ///
228 /// See the [Twitter Developer Documentation][1] for more information.
229 ///
230 /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#follow
231 pub fn follow(&mut self, follow: impl Into<Cow<'a, [u64]>>) -> &mut Self {
232 self.parameters.follow = follow.into();
233 self
234 }
235
236 /// A comma separated list of phrases to filter Tweets by.
237 ///
238 /// Setting an empty string will unset this parameter.
239 ///
240 /// See the [Twitter Developer Documentation][1] for more information.
241 ///
242 /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#track
243 pub fn track(&mut self, track: impl Into<Cow<'a, str>>) -> &mut Self {
244 self.parameters.track = track.into();
245 self
246 }
247
248 /// Set a list of bounding boxes to filter Tweets by.
249 ///
250 /// Setting an empty slice will unset this parameter.
251 ///
252 /// See [`BoundingBox`](struct.BoundingBox.html) and
253 /// the [Twitter Developer Documentation][1] for more information.
254 ///
255 /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#locations
256 pub fn locations(&mut self, locations: impl Into<Cow<'a, [BoundingBox]>>) -> &mut Self {
257 self.parameters.locations = locations.into();
258 self
259 }
260
261 /// The `count` parameter.
262 /// This parameter requires elevated access to use.
263 ///
264 /// See the [Twitter Developer Documentation][1] for more information.
265 ///
266 /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#count
267 pub fn count(&mut self, count: impl Into<Option<i32>>) -> &mut Self {
268 self.parameters.count = count.into();
269 self
270 }
271}
272
273impl std::default::Default for FilterLevel {
274 fn default() -> Self {
275 FilterLevel::None
276 }
277}
278
279impl std::fmt::Display for FilterLevel {
280 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281 AsRef::<str>::as_ref(self).fmt(f)
282 }
283}
284
285fn prepare_request(
286 endpoint: Option<&(RequestMethod, Uri)>,
287 token: Token<&str, &str>,
288 parameters: &Parameters<'_>,
289) -> http::Request<Vec<u8>> {
290 let uri;
291 let (method, endpoint) = if let Some(&(ref method, ref endpoint)) = endpoint {
292 (method, endpoint)
293 } else if parameters.follow.is_empty()
294 && parameters.track.is_empty()
295 && parameters.locations.is_empty()
296 {
297 uri = Uri::from_static(SAMPLE);
298 (&RequestMethod::GET, &uri)
299 } else {
300 uri = Uri::from_static(FILTER);
301 (&RequestMethod::POST, &uri)
302 };
303
304 let req = Request::builder().method(method.clone());
305
306 let mut oauth = oauth::Builder::new(token.client.as_ref(), oauth::HmacSha1);
307 oauth.token(token.token.as_ref());
308
309 if RequestMethod::POST == method {
310 let authorization = oauth.post(endpoint, parameters);
311 let data = oauth::to_form_urlencoded(parameters);
312
313 req.uri(endpoint.clone())
314 .header(AUTHORIZATION, authorization)
315 .header(
316 CONTENT_TYPE,
317 HeaderValue::from_static("application/x-www-form-urlencoded"),
318 )
319 .header(CONTENT_LENGTH, data.len())
320 .body(data.into_bytes())
321 .unwrap()
322 } else {
323 let authorization = oauth.build(method.as_ref(), endpoint, parameters);
324 let uri = oauth::to_uri_query(endpoint.to_string(), parameters);
325
326 req.uri(uri)
327 .header(AUTHORIZATION, authorization)
328 .body(Vec::default())
329 .unwrap()
330 }
331}
332
333const COMMA: &str = "%2C";
334
335fn fmt_follow(ids: &[u64], f: &mut Formatter<'_>) -> fmt::Result {
336 fmt_join(ids, COMMA, f)
337}
338
339fn fmt_locations(locs: &[BoundingBox], f: &mut Formatter<'_>) -> fmt::Result {
340 fmt_join(BoundingBox::flatten_slice(locs).flat(), COMMA, f)
341}
342
343#[allow(clippy::trivially_copy_pass_by_ref)]
344fn not(p: &bool) -> bool {
345 !p
346}