twitter_stream/
builder.rs

1//! A [`Builder`] type for [`TwitterStream`](crate::TwitterStream).
2//!
3//! The Streaming API has two different endpoints: [`POST statuses/filter`] and
4//! [`GET statuses/sample`]. `Builder` automatically determines which endpoint to use based on the
5//! specified parameters. Specifically, when any of [`follow`][Builder::follow],
6//! [`track`][Builder::track] and [`locations`][Builder::locations] parameters is specified,
7//! `filter` will be used, and when none is specified, `sample` will be used.
8//!
9//! [`POST statuses/filter`]: https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter
10//! [`GET statuses/sample`]: https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter
11//!
12//! `filter` yields public Tweets that match the filter predicates specified by the parameters,
13//! and `sample` yields "a small random sample" of all public Tweets.
14//!
15//! ## Example
16//!
17//! ```rust,no_run
18//! use futures::prelude::*;
19//! use twitter_stream::{builder::BoundingBox, Token};
20//!
21//! # #[tokio::main]
22//! # async fn main() {
23//! let token = Token::from_parts("consumer_key", "consumer_secret", "access_key", "access_secret");
24//!
25//! const TOKYO: &'static [BoundingBox] = &[BoundingBox::new(139.56, 35.53, 139.92, 35.82)];
26//!
27//! // Prints geolocated English Tweets associated with Tokyo (the 23 special wards).
28//! twitter_stream::Builder::new(token)
29//!     .locations(TOKYO)
30//!     .language("en")
31//!     .listen()
32//!     .try_flatten_stream()
33//!     .try_for_each(|json| {
34//!         println!("{}", json);
35//!         future::ok(())
36//!     })
37//!     .await
38//!     .unwrap();
39//! # }
40//! ```
41
42mod bounding_box;
43
44pub use http::Method as RequestMethod;
45pub use http::Uri;
46
47pub use bounding_box::BoundingBox;
48
49use std::borrow::Cow;
50use std::fmt::{self, Formatter};
51
52use http::header::{HeaderValue, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE};
53use http::Request;
54use slice_of_array::SliceFlatExt;
55
56use crate::service::HttpService;
57use crate::util::fmt_join;
58use crate::{FutureTwitterStream, Token};
59
60/// A builder for [`TwitterStream`](crate::TwitterStream).
61///
62/// See the [`builder`][crate::builder] module documentation for details.
63#[derive(Clone, Debug)]
64pub struct Builder<'a, T = Token> {
65    token: T,
66    endpoint: Option<(RequestMethod, Uri)>,
67    parameters: Parameters<'a>,
68}
69
70/// Parameters to the Streaming API.
71#[derive(Clone, Debug, Default, oauth::Request)]
72struct Parameters<'a> {
73    #[oauth1(skip_if = not)]
74    stall_warnings: bool,
75    filter_level: Option<FilterLevel>,
76    #[oauth1(skip_if = str::is_empty)]
77    language: Cow<'a, str>,
78    #[oauth1(encoded, fmt = fmt_follow, skip_if = <[_]>::is_empty)]
79    follow: Cow<'a, [u64]>,
80    #[oauth1(skip_if = str::is_empty)]
81    track: Cow<'a, str>,
82    #[oauth1(encoded, fmt = fmt_locations, skip_if = <[_]>::is_empty)]
83    #[allow(clippy::type_complexity)]
84    locations: Cow<'a, [BoundingBox]>,
85    #[oauth1(encoded)]
86    count: Option<i32>,
87}
88
89str_enum! {
90    /// Represents the [`filter_level`] parameter in API requests.
91    ///
92    /// [`filter_level`]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#filter-level
93    #[derive(Clone, Debug, PartialEq, Hash, Eq)]
94    pub enum FilterLevel {
95        /// `"none"`
96        None = "none",
97        /// `"low"`
98        Low = "low",
99        /// `"medium"`
100        Medium = "medium",
101    }
102}
103
104const FILTER: &str = "https://stream.twitter.com/1.1/statuses/filter.json";
105const SAMPLE: &str = "https://stream.twitter.com/1.1/statuses/sample.json";
106
107impl<'a, C, A> Builder<'a, Token<C, A>>
108where
109    C: AsRef<str>,
110    A: AsRef<str>,
111{
112    /// Creates a builder.
113    pub fn new(token: Token<C, A>) -> Self {
114        Builder {
115            token,
116            endpoint: None,
117            parameters: Parameters::default(),
118        }
119    }
120
121    /// Start listening on the Streaming API endpoint, returning a `Future` which resolves
122    /// to a `Stream` yielding JSON messages from the API.
123    ///
124    /// # Panics
125    ///
126    /// This will panic if the underlying HTTPS connector failed to initialize.
127    #[cfg(feature = "hyper")]
128    #[cfg_attr(docsrs, doc(cfg(feature = "hyper")))]
129    pub fn listen(&self) -> crate::hyper::FutureTwitterStream {
130        let conn = hyper_tls::HttpsConnector::new();
131        self.listen_with_client(hyper_pkg::Client::builder().build::<_, hyper_pkg::Body>(conn))
132    }
133
134    /// Same as [`listen`](Builder::listen) except that it uses `client` to make HTTP request
135    /// to the endpoint.
136    ///
137    /// `client` must be able to handle the `https` scheme.
138    ///
139    /// # Panics
140    ///
141    /// This will call `<S as Service>::call` without checking for `<S as Service>::poll_ready`
142    /// and may cause a panic if `client` is not ready to send an HTTP request yet.
143    ///
144    /// # Example
145    ///
146    /// ```no_run
147    /// use tower::ServiceExt;
148    ///
149    /// # async fn doc() -> hyper_pkg::Result<()> {
150    /// # let mut client = hyper_pkg::Client::new();
151    /// # let token = twitter_stream::Token::from_parts("", "", "", "");
152    /// let stream = twitter_stream::Builder::new(token)
153    ///     .listen_with_client(client.ready_and().await?)
154    ///     .await
155    ///     .unwrap();
156    /// # Ok(())
157    /// # }
158    /// ```
159    pub fn listen_with_client<S, B>(&self, mut client: S) -> FutureTwitterStream<S::Future>
160    where
161        S: HttpService<B>,
162        B: From<Vec<u8>>,
163    {
164        let req = prepare_request(
165            self.endpoint.as_ref(),
166            self.token.as_ref(),
167            &self.parameters,
168        );
169        let response = client.call(req.map(Into::into));
170
171        FutureTwitterStream { response }
172    }
173}
174
175impl<'a, C, A> Builder<'a, Token<C, A>> {
176    /// Set the API endpoint URI to be connected.
177    ///
178    /// This overrides the default behavior of automatically determining the endpoint to use.
179    pub fn endpoint(&mut self, endpoint: impl Into<Option<(RequestMethod, Uri)>>) -> &mut Self {
180        self.endpoint = endpoint.into();
181        self
182    }
183
184    /// Reset the token to be used to log into Twitter.
185    pub fn token(&mut self, token: Token<C, A>) -> &mut Self {
186        self.token = token;
187        self
188    }
189
190    /// Set whether to receive messages when in danger of being disconnected.
191    ///
192    /// See the [Twitter Developer Documentation][1] for more information.
193    ///
194    /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#stall-warnings
195    pub fn stall_warnings(&mut self, stall_warnings: bool) -> &mut Self {
196        self.parameters.stall_warnings = stall_warnings;
197        self
198    }
199
200    /// Set the minimum `filter_level` Tweet attribute to receive.
201    /// The default is `FilterLevel::None`.
202    ///
203    /// See the [Twitter Developer Documentation][1] for more information.
204    ///
205    /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#filter-level
206    pub fn filter_level(&mut self, filter_level: impl Into<Option<FilterLevel>>) -> &mut Self {
207        self.parameters.filter_level = filter_level.into();
208        self
209    }
210
211    /// Set a comma-separated language identifiers to receive Tweets
212    /// written in the specified languages only.
213    ///
214    /// Setting an empty string will unset this parameter.
215    ///
216    /// See the [Twitter Developer Documentation][1] for more information.
217    ///
218    /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#language
219    pub fn language(&mut self, language: impl Into<Cow<'a, str>>) -> &mut Self {
220        self.parameters.language = language.into();
221        self
222    }
223
224    /// Set a list of user IDs to receive Tweets from the specified users.
225    ///
226    /// Setting an empty slice will unset this parameter.
227    ///
228    /// See the [Twitter Developer Documentation][1] for more information.
229    ///
230    /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#follow
231    pub fn follow(&mut self, follow: impl Into<Cow<'a, [u64]>>) -> &mut Self {
232        self.parameters.follow = follow.into();
233        self
234    }
235
236    /// A comma separated list of phrases to filter Tweets by.
237    ///
238    /// Setting an empty string will unset this parameter.
239    ///
240    /// See the [Twitter Developer Documentation][1] for more information.
241    ///
242    /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#track
243    pub fn track(&mut self, track: impl Into<Cow<'a, str>>) -> &mut Self {
244        self.parameters.track = track.into();
245        self
246    }
247
248    /// Set a list of bounding boxes to filter Tweets by.
249    ///
250    /// Setting an empty slice will unset this parameter.
251    ///
252    /// See [`BoundingBox`](struct.BoundingBox.html) and
253    /// the [Twitter Developer Documentation][1] for more information.
254    ///
255    /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#locations
256    pub fn locations(&mut self, locations: impl Into<Cow<'a, [BoundingBox]>>) -> &mut Self {
257        self.parameters.locations = locations.into();
258        self
259    }
260
261    /// The `count` parameter.
262    /// This parameter requires elevated access to use.
263    ///
264    /// See the [Twitter Developer Documentation][1] for more information.
265    ///
266    /// [1]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters#count
267    pub fn count(&mut self, count: impl Into<Option<i32>>) -> &mut Self {
268        self.parameters.count = count.into();
269        self
270    }
271}
272
273impl std::default::Default for FilterLevel {
274    fn default() -> Self {
275        FilterLevel::None
276    }
277}
278
279impl std::fmt::Display for FilterLevel {
280    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281        AsRef::<str>::as_ref(self).fmt(f)
282    }
283}
284
285fn prepare_request(
286    endpoint: Option<&(RequestMethod, Uri)>,
287    token: Token<&str, &str>,
288    parameters: &Parameters<'_>,
289) -> http::Request<Vec<u8>> {
290    let uri;
291    let (method, endpoint) = if let Some(&(ref method, ref endpoint)) = endpoint {
292        (method, endpoint)
293    } else if parameters.follow.is_empty()
294        && parameters.track.is_empty()
295        && parameters.locations.is_empty()
296    {
297        uri = Uri::from_static(SAMPLE);
298        (&RequestMethod::GET, &uri)
299    } else {
300        uri = Uri::from_static(FILTER);
301        (&RequestMethod::POST, &uri)
302    };
303
304    let req = Request::builder().method(method.clone());
305
306    let mut oauth = oauth::Builder::new(token.client.as_ref(), oauth::HmacSha1);
307    oauth.token(token.token.as_ref());
308
309    if RequestMethod::POST == method {
310        let authorization = oauth.post(endpoint, parameters);
311        let data = oauth::to_form_urlencoded(parameters);
312
313        req.uri(endpoint.clone())
314            .header(AUTHORIZATION, authorization)
315            .header(
316                CONTENT_TYPE,
317                HeaderValue::from_static("application/x-www-form-urlencoded"),
318            )
319            .header(CONTENT_LENGTH, data.len())
320            .body(data.into_bytes())
321            .unwrap()
322    } else {
323        let authorization = oauth.build(method.as_ref(), endpoint, parameters);
324        let uri = oauth::to_uri_query(endpoint.to_string(), parameters);
325
326        req.uri(uri)
327            .header(AUTHORIZATION, authorization)
328            .body(Vec::default())
329            .unwrap()
330    }
331}
332
333const COMMA: &str = "%2C";
334
335fn fmt_follow(ids: &[u64], f: &mut Formatter<'_>) -> fmt::Result {
336    fmt_join(ids, COMMA, f)
337}
338
339fn fmt_locations(locs: &[BoundingBox], f: &mut Formatter<'_>) -> fmt::Result {
340    fmt_join(BoundingBox::flatten_slice(locs).flat(), COMMA, f)
341}
342
343#[allow(clippy::trivially_copy_pass_by_ref)]
344fn not(p: &bool) -> bool {
345    !p
346}