egg_mode/stream/mod.rs
1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5//! Access to the Streaming API.
6//!
7//! The Streaming API gives real-time access to tweets, narrowed by
8//! search phrases, user id or location. A standard user is able to filter by up to
9//! 400 keywords, 5,000 user ids and 25 locations.
10//! See the [official documentation](https://developer.twitter.com/en/docs/tweets/filter-realtime/overview) for more details.
11//!
12//! ### Example
13//! ```rust,no_run
14//! # #[tokio::main]
15//! # async fn main() {
16//! # let token: egg_mode::Token = unimplemented!();
17//! use egg_mode::stream::{filter, StreamMessage};
18//! use futures::{Stream, TryStreamExt};
19//!
20//! let stream = filter()
21//! // find tweets mentioning any of the following:
22//! .track(&["rustlang", "python", "java", "javascript"])
23//! .start(&token);
24//!
25//! stream.try_for_each(|m| {
26//! // Check the message type and print tweet to console
27//! if let StreamMessage::Tweet(tweet) = m {
28//! println!("Received tweet from {}:\n{}\n", tweet.user.unwrap().name, tweet.text);
29//! }
30//! futures::future::ok(())
31//! }).await.expect("Stream error");
32//! # }
33//! ```
34//! ### Connection notes
35//! To maintain a stable streaming connection requires a certain amount of effort to take
36//! account of random disconnects, networks resets and stalls. The key points are:
37//!
38//! * The Twitter API sends a Ping message every 30 seconds of message inactivity. So set a timeout
39//! such that after (say) 1 minute of inactivity, the client bounces the connection. This will protect
40//! against network stalls
41//! * Twitter will rate-limit reconnect attempts. So attempt conenctions with a linear or exponential
42//! backoff strategy
43//! * In the case of an unreliable connection (e.g. mobile network), fall back to the polling API
44//!
45//! The [official guide](https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/connecting) has more information.
46use std::future::Future;
47use std::pin::Pin;
48use std::str::FromStr;
49use std::task::{Context, Poll};
50use std::{self, io};
51
52use futures::Stream;
53use hyper::client::ResponseFuture;
54use hyper::{Body, Request};
55use serde::de::Error;
56use serde::{Deserialize, Deserializer, Serialize};
57use serde_json;
58
59use crate::auth::Token;
60use crate::common::*;
61use crate::tweet::Tweet;
62use crate::{error, links};
63
64// TODO rewrite this
65// https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types
66/// Represents the kinds of messages that can be sent over Twitter's Streaming API.
67#[derive(Debug)]
68pub enum StreamMessage {
69 /// A blank line, sent periodically to keep the connection alive.
70 Ping,
71 /// A list of accounts the authenticated user follows, sent at the beginning of the session for
72 /// user streams.
73 FriendList(Vec<u64>),
74 /// A new tweet.
75 ///
76 /// Note that the `entities` inside the `user` field will be empty for tweets received via the
77 /// Streaming API.
78 Tweet(Tweet),
79 /// Notice given when a user deletes a post.
80 ///
81 /// Clients are expected to comply with these notices by removing the status "from memory and
82 /// any storage or archive, even in the rare case where a deletion message arrives earlier in
83 /// the stream than the Tweet it references."
84 Delete {
85 /// The status that was deleted.
86 status_id: u64,
87 /// The user that deleted the status.
88 user_id: u64,
89 },
90 /// Notice given when a user removes geolocation information from their profile.
91 ///
92 /// Clients are expected to comply by deleting cached geolocation information from tweets by
93 /// the given user, for any tweets up to and including the given status ID. According to
94 /// Twitter's documentation, "These messages may also arrive before a Tweet which falls into
95 /// the specified range, although this is rare."
96 ScrubGeo {
97 /// The user whose geolocation information needs to be scrubbed.
98 user_id: u64,
99 /// The last status ID to scrub information from.
100 up_to_status_id: u64,
101 },
102 /// Placeholder message used to indicate that a specific tweet has been withheld in certain
103 /// countries.
104 StatusWithheld {
105 /// The status that was withheld.
106 status_id: u64,
107 /// The user that posted the status.
108 user_id: u64,
109 /// A list of uppercase two-character country codes listing the countries where the tweet
110 /// was withheld.
111 withheld_in_countries: Vec<String>,
112 },
113 /// Placeholder message used to indicate that a specific user's content has been withheld in
114 /// certain countries.
115 UserWithheld {
116 /// The user whose content was withheld.
117 user_id: u64,
118 /// A list of uppercase two-character country codes listing the countries where the content
119 /// was withheld.
120 withheld_in_countries: Vec<String>,
121 },
122 /// An error message that may be delivered immediately prior to Twitter disconnecting the
123 /// stream.
124 ///
125 /// Note that if the stream is disconnected due to network issues or the client reading
126 /// messages too slowly, it's possible that this message may not be received.
127 ///
128 /// The enclosed values are an error code and error description. A non-exhaustive list of error
129 /// codes and their associated reasons are available on [Twitter's stream
130 /// docmentation][stream-doc], under "Disconnect messages (disconnect)".
131 ///
132 /// [stream-doc]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types
133 Disconnect(u64, String),
134 /// An unhandled message payload.
135 ///
136 /// Twitter can add new streaming messages to the API, and egg-mode includes them here so that
137 /// they can be used before egg-mode has a chance to handle them.
138 Unknown(serde_json::Value),
139 //TODO: stall warnings? "follows over limit" warnings? (other warnings?)
140}
141
142impl<'de> Deserialize<'de> for StreamMessage {
143 fn deserialize<D>(deser: D) -> Result<StreamMessage, D::Error>
144 where
145 D: Deserializer<'de>,
146 {
147 macro_rules! fetch {
148 ($input: ident, $key: expr) => {
149 $input
150 .get($key)
151 .and_then(|val| serde_json::from_value(val.clone()).ok())
152 .ok_or_else(|| D::Error::custom("Failed"))
153 };
154 }
155
156 let input = serde_json::Value::deserialize(deser)?;
157 let msg = if let Some(del) = input.get("delete").and_then(|d| d.get("status")) {
158 StreamMessage::Delete {
159 status_id: fetch!(del, "id")?,
160 user_id: fetch!(del, "user_id")?,
161 }
162 } else if let Some(scrub) = input.get("scrub_geo") {
163 StreamMessage::ScrubGeo {
164 user_id: fetch!(scrub, "user_id")?,
165 up_to_status_id: fetch!(scrub, "up_to_status_id")?,
166 }
167 } else if let Some(tweet) = input.get("status_withheld") {
168 StreamMessage::StatusWithheld {
169 status_id: fetch!(tweet, "id")?,
170 user_id: fetch!(tweet, "user_id")?,
171 withheld_in_countries: fetch!(tweet, "withheld_in_countries")?,
172 }
173 } else if let Some(user) = input.get("user_withheld") {
174 StreamMessage::UserWithheld {
175 user_id: fetch!(user, "id")?,
176 withheld_in_countries: fetch!(user, "withheld_in_countries")?,
177 }
178 } else if let Some(err) = input.get("disconnect") {
179 StreamMessage::Disconnect(fetch!(err, "code")?, fetch!(err, "reason")?)
180 } else if let Some(friends) = input.get("friends") {
181 StreamMessage::FriendList(
182 serde_json::from_value(friends.clone())
183 .map_err(|e| D::Error::custom(format!("{}", e)))?,
184 )
185 // TODO remove clone?
186 } else if let Ok(tweet) = serde_json::from_value::<Tweet>(input.clone()) {
187 StreamMessage::Tweet(tweet)
188 } else {
189 StreamMessage::Unknown(input.clone())
190 };
191 Ok(msg)
192 }
193}
194
195impl FromStr for StreamMessage {
196 type Err = error::Error;
197 fn from_str(input: &str) -> Result<Self, error::Error> {
198 let input = input.trim();
199 if input.is_empty() {
200 Ok(StreamMessage::Ping)
201 } else {
202 Ok(serde_json::from_str(input)?)
203 }
204 }
205}
206
207/// A `Stream` that represents a connection to the Twitter Streaming API.
208#[must_use = "Streams are lazy and do nothing unless polled"]
209pub struct TwitterStream {
210 buf: Vec<u8>,
211 request: Option<Request<Body>>,
212 response: Option<ResponseFuture>,
213 body: Option<Body>,
214}
215
216impl TwitterStream {
217 pub(crate) fn new(request: Request<Body>) -> TwitterStream {
218 TwitterStream {
219 buf: vec![],
220 request: Some(request),
221 response: None,
222 body: None,
223 }
224 }
225}
226
227impl Stream for TwitterStream {
228 type Item = Result<StreamMessage, error::Error>;
229
230 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
231 if let Some(req) = self.request.take() {
232 self.response = Some(get_response(req));
233 }
234
235 if let Some(mut resp) = self.response.take() {
236 match Pin::new(&mut resp).poll(cx) {
237 Poll::Pending => {
238 self.response = Some(resp);
239 return Poll::Pending;
240 }
241 Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
242 Poll::Ready(Ok(resp)) => {
243 let status = resp.status();
244 if !status.is_success() {
245 //TODO: should i try to pull the response regardless?
246 return Poll::Ready(Some(Err(error::Error::BadStatus(status))));
247 }
248
249 self.body = Some(resp.into_body());
250 }
251 }
252 }
253
254 if let Some(mut body) = self.body.take() {
255 loop {
256 match Pin::new(&mut body).poll_next(cx) {
257 Poll::Pending => {
258 self.body = Some(body);
259 return Poll::Pending;
260 }
261 Poll::Ready(None) => {
262 return Poll::Ready(None);
263 }
264 Poll::Ready(Some(Err(e))) => {
265 self.body = Some(body);
266 return Poll::Ready(Some(Err(e.into())));
267 }
268 Poll::Ready(Some(Ok(chunk))) => {
269 self.buf.extend(&*chunk);
270
271 if let Some(pos) = self.buf.windows(2).position(|w| w == b"\r\n") {
272 self.body = Some(body);
273 let pos = pos + 2;
274 let resp = if let Ok(msg_str) = std::str::from_utf8(&self.buf[..pos]) {
275 StreamMessage::from_str(msg_str)
276 } else {
277 Err(io::Error::new(
278 io::ErrorKind::InvalidData,
279 "stream did not contain valid UTF-8",
280 )
281 .into())
282 };
283
284 self.buf.drain(..pos);
285 return Poll::Ready(Some(Ok(resp?)));
286 }
287 }
288 }
289 }
290 } else {
291 Poll::Ready(Some(Err(error::Error::FutureAlreadyCompleted)))
292 }
293 }
294}
295
296/// Represents the amount of filtering that can be done to streams on Twitter's side.
297///
298/// According to Twitter's documentation, "When displaying a stream of Tweets to end users
299/// (dashboards or live feeds at a presentation or conference, for example) it is suggested that
300/// you set this value to medium."
301#[derive(Copy, Clone, Debug, Deserialize, Serialize)]
302pub enum FilterLevel {
303 /// No filtering.
304 #[serde(rename = "none")]
305 None,
306 /// A light amount of filtering.
307 #[serde(rename = "low")]
308 Low,
309 /// A medium amount of filtering.
310 #[serde(rename = "medium")]
311 Medium,
312}
313
314/// `Display` impl to turn `FilterLevel` variants into the form needed for stream parameters. This
315/// is basically "the variant name, in lowercase".
316// TODO Probably can remove this somehow
317impl ::std::fmt::Display for FilterLevel {
318 fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
319 match *self {
320 FilterLevel::None => write!(f, "none"),
321 FilterLevel::Low => write!(f, "low"),
322 FilterLevel::Medium => write!(f, "medium"),
323 }
324 }
325}
326
327/// Represents a `TwitterStream` before it is started. Use the various methods to build
328/// up the filters on your stream.
329///
330/// Bear in mind that the `track`, `follow` and `locations` filters are `OR`ed rather than `AND`ed
331/// together. E.g. if you specify a user id to follow and a phrase to track, you will receive
332/// all tweets that match (user id OR phrase), NOT (user id AND phrase).
333/// For more details see the [official docs](https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters)
334///
335/// __Note__: The user __must__ specify at least one `track`, `follow` or `locations` filter or else
336/// the stream will __fail__ at point of connection.
337pub struct StreamBuilder {
338 url: &'static str,
339 follow: Vec<u64>,
340 track: Vec<String>,
341 language: Vec<String>,
342 locations: Vec<BoundingBox>,
343 filter_level: Option<FilterLevel>,
344}
345
346impl StreamBuilder {
347 fn new(url: &'static str) -> Self {
348 StreamBuilder {
349 url,
350 follow: Vec::new(),
351 track: Vec::new(),
352 language: Vec::new(),
353 locations: Vec::new(),
354 filter_level: None,
355 }
356 }
357
358 /// Filter stream to only return Tweets relating to given user IDs.
359 /// ### Example
360 /// ```rust,no_run
361 /// # fn main() {
362 /// # let token: egg_mode::Token = unimplemented!();
363 /// use egg_mode::stream::filter;
364 /// let stream = filter()
365 /// // View tweets related to BBC news, the Guardian and the New York Times
366 /// .follow(&[612473, 87818409, 807095])
367 /// .start(&token);
368 /// # }
369 /// ```
370 pub fn follow(mut self, to_follow: &[u64]) -> Self {
371 self.follow.extend(to_follow.iter());
372 self
373 }
374
375 /// Filter stream to only return Tweets containing given phrases.
376 ///
377 /// A phrase may be one or more terms separated by spaces, and a phrase will match if all
378 /// of the terms in the phrase are present in the Tweet, regardless of order and ignoring case.
379 pub fn track<I: IntoIterator<Item = S>, S: AsRef<str>>(mut self, to_track: I) -> Self {
380 self.track
381 .extend(to_track.into_iter().map(|s| s.as_ref().to_string()));
382 self
383 }
384
385 /// Filter stream to only return Tweets that have been detected as being written
386 /// in the specified languages.
387 ///
388 /// Languages are specified as a list of
389 /// [BCP 47](http://tools.ietf.org/html/bcp47) language identifiers
390 /// corresponding to any of the languages listed on Twitter’s
391 /// [advanced search](https://twitter.com/search-advancedpage) page.
392 ///
393 /// __Note__ This library does __not__ validate the language codes.
394 pub fn language<I: IntoIterator<Item = S>, S: AsRef<str>>(mut self, languages: I) -> Self {
395 self.language
396 .extend(languages.into_iter().map(|s| s.as_ref().to_string()));
397 self
398 }
399
400 /// A list of bounding boxes by which to filter Tweets
401 ///
402 /// ### Example
403 /// ```rust,no_run
404 /// # fn main() {
405 /// # let token: egg_mode::Token = unimplemented!();
406 /// use egg_mode::stream::{filter, BoundingBox};
407 /// let stream = filter()
408 /// // Only show tweets sent from New York
409 /// .locations(&[BoundingBox::new((-74.0,40.0),(-73.0,41.0))])
410 /// .start(&token);
411 /// # }
412 /// ```
413 pub fn locations(mut self, locations: &[BoundingBox]) -> Self {
414 self.locations.extend(locations.iter());
415 self
416 }
417
418 /// Applies the given `FilterLevel` to the stream. Tweets with a `filter_level` below the given
419 /// value will not be shown in the stream.
420 ///
421 /// When displaying a stream of Tweets to end users
422 /// (dashboards or live feeds at a presentation or conference, for example) it is suggested
423 /// that you set this value to medium.
424 pub fn filter_level(self, filter_level: FilterLevel) -> StreamBuilder {
425 StreamBuilder {
426 filter_level: Some(filter_level),
427 ..self
428 }
429 }
430
431 /// Finalizes the stream parameters and returns the resulting `TwitterStream`.
432 pub fn start(self, token: &Token) -> TwitterStream {
433 // Re connection failure, arguably this library should check that either 'track' or
434 // 'follow' exist and return an error if not. However, in such a case the request is not
435 // 'invalid' from POV of twitter api, rather it is invalid at the application level.
436 // So I think the current behaviour make sense.
437
438 let mut params =
439 ParamList::new().add_opt_param("filter_level", self.filter_level.map_string());
440
441 if !self.follow.is_empty() {
442 let to_follow = self
443 .follow
444 .iter()
445 .map(|id| id.to_string())
446 .collect::<Vec<String>>()
447 .join(",");
448 params.add_param_ref("follow", to_follow);
449 }
450
451 if !self.track.is_empty() {
452 let to_track = self.track.join(",");
453 params.add_param_ref("track", to_track);
454 }
455
456 if !self.language.is_empty() {
457 let langs = self.language.join(",");
458 params.add_param_ref("language", langs);
459 }
460
461 if !self.locations.is_empty() {
462 let locs = self
463 .locations
464 .iter()
465 .map(|bb| bb.to_string())
466 .collect::<Vec<String>>()
467 .join(",");
468 params.add_param_ref("locations", locs);
469 }
470
471 let req = post(self.url, token, Some(¶ms));
472
473 TwitterStream::new(req)
474 }
475}
476
477/// Begins building a request to a filtered public stream.
478pub fn filter() -> StreamBuilder {
479 StreamBuilder::new(links::stream::FILTER)
480}
481
482/// Opens a `TwitterStream` returning "a small random sample of all public statuses".
483///
484/// As sample streams don't have the same configuration options as filter streams,
485/// this directly returns a `TwitterStream`, rather than going through a [`StreamBuilder`]. To apply
486/// filter options on the public stream, start with [`filter`] and add parameters to the
487/// [`StreamBuilder`] returned there.
488///
489/// [`StreamBuilder`]: struct.StreamBuilder.html
490/// [`filter`]: fn.filter.html
491pub fn sample(token: &Token) -> TwitterStream {
492 let req = get(links::stream::SAMPLE, token, None);
493 TwitterStream::new(req)
494}
495
496#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
497/// Represents a bounding box of (longitude, latitude) pairs.
498///
499/// Guaranteed to be in-bounds.
500pub struct BoundingBox {
501 southwest: (f64, f64),
502 northeast: (f64, f64),
503}
504
505impl ::std::fmt::Display for BoundingBox {
506 fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
507 write!(
508 f,
509 "{},{},{},{}",
510 self.southwest.0, self.southwest.1, self.northeast.0, self.northeast.1
511 )
512 }
513}
514
515impl BoundingBox {
516 /// New BoundingBox. Expects (logitude, latitude pairs) describing the southwest and
517 /// northeast points of the bounding box. Checks the values are in-bounds.
518 pub fn new(southwest: (f64, f64), northeast: (f64, f64)) -> BoundingBox {
519 // TODO integrate with `bounding_box` in `place` module.
520 // TODO check consitency
521 BoundingBox {
522 southwest,
523 northeast,
524 }
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use crate::common::tests::load_file;
532
533 fn load_stream(path: &str) -> StreamMessage {
534 let sample = load_file(path);
535 ::serde_json::from_str(&sample).unwrap()
536 }
537
538 #[test]
539 fn parse_tweet_stream() {
540 let msg = load_stream("sample_payloads/sample-stream.json");
541 if let StreamMessage::Tweet(_tweet) = msg {
542 // OK
543 } else {
544 panic!("Not a tweet")
545 }
546 }
547
548 #[test]
549 fn parse_empty_stream() {
550 let msg = StreamMessage::from_str("").unwrap();
551 if let StreamMessage::Ping = msg {
552 // OK
553 } else {
554 panic!("Not a ping")
555 }
556 }
557}