egg_mode/stream/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5//! Access to the Streaming API.
6//!
7//! The Streaming API gives real-time access to tweets, narrowed by
8//! search phrases, user id or location. A standard user is able to filter by up to
9//! 400 keywords, 5,000 user ids and 25 locations.
10//! See the [official documentation](https://developer.twitter.com/en/docs/tweets/filter-realtime/overview) for more details.
11//!
12//! ### Example
13//! ```rust,no_run
14//! # #[tokio::main]
15//! # async fn main() {
16//! # let token: egg_mode::Token = unimplemented!();
17//! use egg_mode::stream::{filter, StreamMessage};
18//! use futures::{Stream, TryStreamExt};
19//!
20//! let stream = filter()
21//!     // find tweets mentioning any of the following:
22//!     .track(&["rustlang", "python", "java", "javascript"])
23//!     .start(&token);
24//!
25//! stream.try_for_each(|m| {
26//!     // Check the message type and print tweet to console
27//!     if let StreamMessage::Tweet(tweet) = m {
28//!         println!("Received tweet from {}:\n{}\n", tweet.user.unwrap().name, tweet.text);
29//!     }
30//!     futures::future::ok(())
31//! }).await.expect("Stream error");
32//! # }
33//! ```
34//! ### Connection notes
35//! To maintain a stable streaming connection requires a certain amount of effort to take
36//! account of random disconnects, networks resets and stalls. The key points are:
37//!
38//! * The Twitter API sends a Ping message every 30 seconds of message inactivity. So set a timeout
39//! such that after (say) 1 minute of inactivity, the client bounces the connection. This will protect
40//! against network stalls
41//! * Twitter will rate-limit reconnect attempts. So attempt conenctions with a linear or exponential
42//! backoff strategy
43//! * In the case of an unreliable connection (e.g. mobile network), fall back to the polling API
44//!
45//! The [official guide](https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/connecting) has more information.
46use std::future::Future;
47use std::pin::Pin;
48use std::str::FromStr;
49use std::task::{Context, Poll};
50use std::{self, io};
51
52use futures::Stream;
53use hyper::client::ResponseFuture;
54use hyper::{Body, Request};
55use serde::de::Error;
56use serde::{Deserialize, Deserializer, Serialize};
57use serde_json;
58
59use crate::auth::Token;
60use crate::common::*;
61use crate::tweet::Tweet;
62use crate::{error, links};
63
64// TODO rewrite this
65// https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types
66/// Represents the kinds of messages that can be sent over Twitter's Streaming API.
67#[derive(Debug)]
68pub enum StreamMessage {
69    /// A blank line, sent periodically to keep the connection alive.
70    Ping,
71    /// A list of accounts the authenticated user follows, sent at the beginning of the session for
72    /// user streams.
73    FriendList(Vec<u64>),
74    /// A new tweet.
75    ///
76    /// Note that the `entities` inside the `user` field will be empty for tweets received via the
77    /// Streaming API.
78    Tweet(Tweet),
79    /// Notice given when a user deletes a post.
80    ///
81    /// Clients are expected to comply with these notices by removing the status "from memory and
82    /// any storage or archive, even in the rare case where a deletion message arrives earlier in
83    /// the stream than the Tweet it references."
84    Delete {
85        /// The status that was deleted.
86        status_id: u64,
87        /// The user that deleted the status.
88        user_id: u64,
89    },
90    /// Notice given when a user removes geolocation information from their profile.
91    ///
92    /// Clients are expected to comply by deleting cached geolocation information from tweets by
93    /// the given user, for any tweets up to and including the given status ID. According to
94    /// Twitter's documentation, "These messages may also arrive before a Tweet which falls into
95    /// the specified range, although this is rare."
96    ScrubGeo {
97        /// The user whose geolocation information needs to be scrubbed.
98        user_id: u64,
99        /// The last status ID to scrub information from.
100        up_to_status_id: u64,
101    },
102    /// Placeholder message used to indicate that a specific tweet has been withheld in certain
103    /// countries.
104    StatusWithheld {
105        /// The status that was withheld.
106        status_id: u64,
107        /// The user that posted the status.
108        user_id: u64,
109        /// A list of uppercase two-character country codes listing the countries where the tweet
110        /// was withheld.
111        withheld_in_countries: Vec<String>,
112    },
113    /// Placeholder message used to indicate that a specific user's content has been withheld in
114    /// certain countries.
115    UserWithheld {
116        /// The user whose content was withheld.
117        user_id: u64,
118        /// A list of uppercase two-character country codes listing the countries where the content
119        /// was withheld.
120        withheld_in_countries: Vec<String>,
121    },
122    /// An error message that may be delivered immediately prior to Twitter disconnecting the
123    /// stream.
124    ///
125    /// Note that if the stream is disconnected due to network issues or the client reading
126    /// messages too slowly, it's possible that this message may not be received.
127    ///
128    /// The enclosed values are an error code and error description. A non-exhaustive list of error
129    /// codes and their associated reasons are available on [Twitter's stream
130    /// docmentation][stream-doc], under "Disconnect messages (disconnect)".
131    ///
132    /// [stream-doc]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types
133    Disconnect(u64, String),
134    /// An unhandled message payload.
135    ///
136    /// Twitter can add new streaming messages to the API, and egg-mode includes them here so that
137    /// they can be used before egg-mode has a chance to handle them.
138    Unknown(serde_json::Value),
139    //TODO: stall warnings? "follows over limit" warnings? (other warnings?)
140}
141
142impl<'de> Deserialize<'de> for StreamMessage {
143    fn deserialize<D>(deser: D) -> Result<StreamMessage, D::Error>
144    where
145        D: Deserializer<'de>,
146    {
147        macro_rules! fetch {
148            ($input: ident, $key: expr) => {
149                $input
150                    .get($key)
151                    .and_then(|val| serde_json::from_value(val.clone()).ok())
152                    .ok_or_else(|| D::Error::custom("Failed"))
153            };
154        }
155
156        let input = serde_json::Value::deserialize(deser)?;
157        let msg = if let Some(del) = input.get("delete").and_then(|d| d.get("status")) {
158            StreamMessage::Delete {
159                status_id: fetch!(del, "id")?,
160                user_id: fetch!(del, "user_id")?,
161            }
162        } else if let Some(scrub) = input.get("scrub_geo") {
163            StreamMessage::ScrubGeo {
164                user_id: fetch!(scrub, "user_id")?,
165                up_to_status_id: fetch!(scrub, "up_to_status_id")?,
166            }
167        } else if let Some(tweet) = input.get("status_withheld") {
168            StreamMessage::StatusWithheld {
169                status_id: fetch!(tweet, "id")?,
170                user_id: fetch!(tweet, "user_id")?,
171                withheld_in_countries: fetch!(tweet, "withheld_in_countries")?,
172            }
173        } else if let Some(user) = input.get("user_withheld") {
174            StreamMessage::UserWithheld {
175                user_id: fetch!(user, "id")?,
176                withheld_in_countries: fetch!(user, "withheld_in_countries")?,
177            }
178        } else if let Some(err) = input.get("disconnect") {
179            StreamMessage::Disconnect(fetch!(err, "code")?, fetch!(err, "reason")?)
180        } else if let Some(friends) = input.get("friends") {
181            StreamMessage::FriendList(
182                serde_json::from_value(friends.clone())
183                    .map_err(|e| D::Error::custom(format!("{}", e)))?,
184            )
185        // TODO remove clone?
186        } else if let Ok(tweet) = serde_json::from_value::<Tweet>(input.clone()) {
187            StreamMessage::Tweet(tweet)
188        } else {
189            StreamMessage::Unknown(input.clone())
190        };
191        Ok(msg)
192    }
193}
194
195impl FromStr for StreamMessage {
196    type Err = error::Error;
197    fn from_str(input: &str) -> Result<Self, error::Error> {
198        let input = input.trim();
199        if input.is_empty() {
200            Ok(StreamMessage::Ping)
201        } else {
202            Ok(serde_json::from_str(input)?)
203        }
204    }
205}
206
207/// A `Stream` that represents a connection to the Twitter Streaming API.
208#[must_use = "Streams are lazy and do nothing unless polled"]
209pub struct TwitterStream {
210    buf: Vec<u8>,
211    request: Option<Request<Body>>,
212    response: Option<ResponseFuture>,
213    body: Option<Body>,
214}
215
216impl TwitterStream {
217    pub(crate) fn new(request: Request<Body>) -> TwitterStream {
218        TwitterStream {
219            buf: vec![],
220            request: Some(request),
221            response: None,
222            body: None,
223        }
224    }
225}
226
227impl Stream for TwitterStream {
228    type Item = Result<StreamMessage, error::Error>;
229
230    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
231        if let Some(req) = self.request.take() {
232            self.response = Some(get_response(req));
233        }
234
235        if let Some(mut resp) = self.response.take() {
236            match Pin::new(&mut resp).poll(cx) {
237                Poll::Pending => {
238                    self.response = Some(resp);
239                    return Poll::Pending;
240                }
241                Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
242                Poll::Ready(Ok(resp)) => {
243                    let status = resp.status();
244                    if !status.is_success() {
245                        //TODO: should i try to pull the response regardless?
246                        return Poll::Ready(Some(Err(error::Error::BadStatus(status))));
247                    }
248
249                    self.body = Some(resp.into_body());
250                }
251            }
252        }
253
254        if let Some(mut body) = self.body.take() {
255            loop {
256                match Pin::new(&mut body).poll_next(cx) {
257                    Poll::Pending => {
258                        self.body = Some(body);
259                        return Poll::Pending;
260                    }
261                    Poll::Ready(None) => {
262                        return Poll::Ready(None);
263                    }
264                    Poll::Ready(Some(Err(e))) => {
265                        self.body = Some(body);
266                        return Poll::Ready(Some(Err(e.into())));
267                    }
268                    Poll::Ready(Some(Ok(chunk))) => {
269                        self.buf.extend(&*chunk);
270
271                        if let Some(pos) = self.buf.windows(2).position(|w| w == b"\r\n") {
272                            self.body = Some(body);
273                            let pos = pos + 2;
274                            let resp = if let Ok(msg_str) = std::str::from_utf8(&self.buf[..pos]) {
275                                StreamMessage::from_str(msg_str)
276                            } else {
277                                Err(io::Error::new(
278                                    io::ErrorKind::InvalidData,
279                                    "stream did not contain valid UTF-8",
280                                )
281                                .into())
282                            };
283
284                            self.buf.drain(..pos);
285                            return Poll::Ready(Some(Ok(resp?)));
286                        }
287                    }
288                }
289            }
290        } else {
291            Poll::Ready(Some(Err(error::Error::FutureAlreadyCompleted)))
292        }
293    }
294}
295
296/// Represents the amount of filtering that can be done to streams on Twitter's side.
297///
298/// According to Twitter's documentation, "When displaying a stream of Tweets to end users
299/// (dashboards or live feeds at a presentation or conference, for example) it is suggested that
300/// you set this value to medium."
301#[derive(Copy, Clone, Debug, Deserialize, Serialize)]
302pub enum FilterLevel {
303    /// No filtering.
304    #[serde(rename = "none")]
305    None,
306    /// A light amount of filtering.
307    #[serde(rename = "low")]
308    Low,
309    /// A medium amount of filtering.
310    #[serde(rename = "medium")]
311    Medium,
312}
313
314/// `Display` impl to turn `FilterLevel` variants into the form needed for stream parameters. This
315/// is basically "the variant name, in lowercase".
316// TODO Probably can remove this somehow
317impl ::std::fmt::Display for FilterLevel {
318    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
319        match *self {
320            FilterLevel::None => write!(f, "none"),
321            FilterLevel::Low => write!(f, "low"),
322            FilterLevel::Medium => write!(f, "medium"),
323        }
324    }
325}
326
327/// Represents a `TwitterStream` before it is started. Use the various methods to build
328/// up the filters on your stream.
329///
330/// Bear in mind that the `track`, `follow` and `locations` filters are `OR`ed rather than `AND`ed
331/// together. E.g. if you specify a user id to follow and a phrase to track, you will receive
332/// all tweets that match (user id OR phrase), NOT (user id AND phrase).
333/// For more details see the [official docs](https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters)
334///
335/// __Note__: The user __must__ specify at least one `track`, `follow` or `locations` filter or else
336/// the stream will __fail__ at point of connection.
337pub struct StreamBuilder {
338    url: &'static str,
339    follow: Vec<u64>,
340    track: Vec<String>,
341    language: Vec<String>,
342    locations: Vec<BoundingBox>,
343    filter_level: Option<FilterLevel>,
344}
345
346impl StreamBuilder {
347    fn new(url: &'static str) -> Self {
348        StreamBuilder {
349            url,
350            follow: Vec::new(),
351            track: Vec::new(),
352            language: Vec::new(),
353            locations: Vec::new(),
354            filter_level: None,
355        }
356    }
357
358    /// Filter stream to only return Tweets relating to given user IDs.
359    /// ### Example
360    /// ```rust,no_run
361    /// # fn main() {
362    /// # let token: egg_mode::Token = unimplemented!();
363    /// use egg_mode::stream::filter;
364    /// let stream = filter()
365    ///     // View tweets related to BBC news, the Guardian and the New York Times
366    ///     .follow(&[612473, 87818409, 807095])
367    ///     .start(&token);
368    /// # }
369    /// ```
370    pub fn follow(mut self, to_follow: &[u64]) -> Self {
371        self.follow.extend(to_follow.iter());
372        self
373    }
374
375    /// Filter stream to only return Tweets containing given phrases.
376    ///
377    /// A phrase may be one or more terms separated by spaces, and a phrase will match if all
378    /// of the terms in the phrase are present in the Tweet, regardless of order and ignoring case.
379    pub fn track<I: IntoIterator<Item = S>, S: AsRef<str>>(mut self, to_track: I) -> Self {
380        self.track
381            .extend(to_track.into_iter().map(|s| s.as_ref().to_string()));
382        self
383    }
384
385    /// Filter stream to only return Tweets that have been detected as being written
386    /// in the specified languages.
387    ///
388    /// Languages are specified as a list of
389    /// [BCP 47](http://tools.ietf.org/html/bcp47) language identifiers
390    /// corresponding to any of the languages listed on Twitter’s
391    /// [advanced search](https://twitter.com/search-advancedpage) page.
392    ///
393    /// __Note__ This library does __not__ validate the language codes.
394    pub fn language<I: IntoIterator<Item = S>, S: AsRef<str>>(mut self, languages: I) -> Self {
395        self.language
396            .extend(languages.into_iter().map(|s| s.as_ref().to_string()));
397        self
398    }
399
400    /// A list of bounding boxes by which to filter Tweets
401    ///
402    /// ### Example
403    /// ```rust,no_run
404    /// # fn main() {
405    /// # let token: egg_mode::Token = unimplemented!();
406    /// use egg_mode::stream::{filter, BoundingBox};
407    /// let stream = filter()
408    ///     // Only show tweets sent from New York
409    ///     .locations(&[BoundingBox::new((-74.0,40.0),(-73.0,41.0))])
410    ///     .start(&token);
411    /// # }
412    /// ```
413    pub fn locations(mut self, locations: &[BoundingBox]) -> Self {
414        self.locations.extend(locations.iter());
415        self
416    }
417
418    /// Applies the given `FilterLevel` to the stream. Tweets with a `filter_level` below the given
419    /// value will not be shown in the stream.
420    ///
421    /// When displaying a stream of Tweets to end users
422    /// (dashboards or live feeds at a presentation or conference, for example) it is suggested
423    /// that you set this value to medium.
424    pub fn filter_level(self, filter_level: FilterLevel) -> StreamBuilder {
425        StreamBuilder {
426            filter_level: Some(filter_level),
427            ..self
428        }
429    }
430
431    /// Finalizes the stream parameters and returns the resulting `TwitterStream`.
432    pub fn start(self, token: &Token) -> TwitterStream {
433        // Re connection failure, arguably this library should check that either 'track' or
434        // 'follow' exist and return an error if not. However, in such a case the request is not
435        // 'invalid' from POV of twitter api, rather it is invalid at the application level.
436        // So I think the current behaviour make sense.
437
438        let mut params =
439            ParamList::new().add_opt_param("filter_level", self.filter_level.map_string());
440
441        if !self.follow.is_empty() {
442            let to_follow = self
443                .follow
444                .iter()
445                .map(|id| id.to_string())
446                .collect::<Vec<String>>()
447                .join(",");
448            params.add_param_ref("follow", to_follow);
449        }
450
451        if !self.track.is_empty() {
452            let to_track = self.track.join(",");
453            params.add_param_ref("track", to_track);
454        }
455
456        if !self.language.is_empty() {
457            let langs = self.language.join(",");
458            params.add_param_ref("language", langs);
459        }
460
461        if !self.locations.is_empty() {
462            let locs = self
463                .locations
464                .iter()
465                .map(|bb| bb.to_string())
466                .collect::<Vec<String>>()
467                .join(",");
468            params.add_param_ref("locations", locs);
469        }
470
471        let req = post(self.url, token, Some(&params));
472
473        TwitterStream::new(req)
474    }
475}
476
477/// Begins building a request to a filtered public stream.
478pub fn filter() -> StreamBuilder {
479    StreamBuilder::new(links::stream::FILTER)
480}
481
482/// Opens a `TwitterStream` returning "a small random sample of all public statuses".
483///
484/// As sample streams don't have the same configuration options as filter streams,
485/// this directly returns a `TwitterStream`, rather than going through a [`StreamBuilder`]. To apply
486/// filter options on the public stream, start with [`filter`] and add parameters to the
487/// [`StreamBuilder`] returned there.
488///
489/// [`StreamBuilder`]: struct.StreamBuilder.html
490/// [`filter`]: fn.filter.html
491pub fn sample(token: &Token) -> TwitterStream {
492    let req = get(links::stream::SAMPLE, token, None);
493    TwitterStream::new(req)
494}
495
496#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
497/// Represents a bounding box of (longitude, latitude) pairs.
498///
499/// Guaranteed to be in-bounds.
500pub struct BoundingBox {
501    southwest: (f64, f64),
502    northeast: (f64, f64),
503}
504
505impl ::std::fmt::Display for BoundingBox {
506    fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
507        write!(
508            f,
509            "{},{},{},{}",
510            self.southwest.0, self.southwest.1, self.northeast.0, self.northeast.1
511        )
512    }
513}
514
515impl BoundingBox {
516    /// New BoundingBox. Expects (logitude, latitude pairs) describing the southwest and
517    /// northeast points of the bounding box. Checks the values are in-bounds.
518    pub fn new(southwest: (f64, f64), northeast: (f64, f64)) -> BoundingBox {
519        // TODO integrate with `bounding_box` in `place` module.
520        // TODO check consitency
521        BoundingBox {
522            southwest,
523            northeast,
524        }
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use crate::common::tests::load_file;
532
533    fn load_stream(path: &str) -> StreamMessage {
534        let sample = load_file(path);
535        ::serde_json::from_str(&sample).unwrap()
536    }
537
538    #[test]
539    fn parse_tweet_stream() {
540        let msg = load_stream("sample_payloads/sample-stream.json");
541        if let StreamMessage::Tweet(_tweet) = msg {
542            // OK
543        } else {
544            panic!("Not a tweet")
545        }
546    }
547
548    #[test]
549    fn parse_empty_stream() {
550        let msg = StreamMessage::from_str("").unwrap();
551        if let StreamMessage::Ping = msg {
552            // OK
553        } else {
554            panic!("Not a ping")
555        }
556    }
557}