twitter_stream/
lib.rs

1#![doc(html_root_url = "https://docs.rs/twitter-stream/0.13.0")]
2
3/*!
4# Twitter Stream
5
6A library for listening on Twitter Streaming API.
7
8## Usage
9
10Add this to your `Cargo.toml`:
11
12```toml
13[dependencies]
14futures = "0.3"
15tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
16twitter-stream = "0.13"
17```
18
19## Overview
20
21Here is a basic example that prints public mentions to @Twitter in JSON format:
22
23```no_run
24use futures::prelude::*;
25use twitter_stream::{Token, TwitterStream};
26
27# #[tokio::main]
28# async fn main() {
29let token = Token::from_parts("consumer_key", "consumer_secret", "access_key", "access_secret");
30
31TwitterStream::track("@Twitter", &token)
32    .try_flatten_stream()
33    .try_for_each(|json| {
34        println!("{}", json);
35        future::ok(())
36    })
37    .await
38    .unwrap();
39# }
40```
41
42See the [`TwitterStream`] type documentation for details.
43
44## Streaming messages
45
46`TwitterStream` yields the raw JSON strings returned by the Streaming API. Each string value
47contains exactly one JSON value.
48
49The underlying Streaming API [sends a blank line][stalls] every 30 seconds as a "keep-alive" signal,
50but `TwitterStream` discards it so that you can always expect to yield a valid JSON string.
51On the other hand, this means that you cannot use the blank line to set a timeout on `Stream`-level.
52If you want the stream to time out on network stalls, set a timeout on the underlying
53HTTP connector, instead of the `Stream` (see the [`timeout` example] in the crate's repository
54for details).
55
56[stalls]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/connecting#stalls
57[`timeout` example]: https://github.com/tesaguri/twitter-stream-rs/blob/v0.13.0/examples/timeout.rs
58
59The JSON string usually, but not always, represents a [Tweet] object. When deserializing the JSON
60string, you should be able to handle any kind of JSON value. A possible implementation of
61deserialization would be like the following:
62
63[Tweet]: https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object
64
65```
66#[derive(serde::Deserialize)]
67#[serde(untagged)]
68enum StreamMessage {
69    Tweet(Tweet),
70    // Discards anything other than a Tweet.
71    // You can handle other message types as well by adding correspoiding variants.
72    Other(serde::de::IgnoredAny),
73}
74
75#[derive(serde::Deserialize)]
76struct Tweet { /* ... */ }
77```
78
79The [`echo_bot` example] in the crate's repository shows an example of a `StreamMessage`
80implementation.
81
82[`echo_bot` example]: https://github.com/tesaguri/twitter-stream-rs/blob/v0.13.0/examples/echo_bot.rs
83
84See the [Twitter Developers Documentation][message-types] for the types and formats of the JSON
85messages.
86
87[message-types]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types
88*/
89
90#![cfg_attr(docsrs, feature(doc_cfg))]
91#![deny(broken_intra_doc_links)]
92#![warn(missing_docs)]
93
94#[macro_use]
95mod util;
96
97pub mod builder;
98pub mod error;
99#[cfg(feature = "hyper")]
100#[cfg_attr(docsrs, doc(cfg(feature = "hyper")))]
101pub mod hyper;
102pub mod service;
103
104#[doc(no_inline)]
105pub use oauth_credentials::Credentials;
106
107pub use crate::builder::Builder;
108pub use crate::error::Error;
109
110use std::future::Future;
111use std::pin::Pin;
112use std::str;
113use std::task::{Context, Poll};
114
115use bytes::Bytes;
116use futures_core::{ready, Stream};
117use http::Response;
118use http::StatusCode;
119use http_body::Body;
120use pin_project_lite::pin_project;
121
122use crate::util::Lines;
123
124pin_project! {
125    /// A future returned by constructor methods which resolves to a [`TwitterStream`].
126    pub struct FutureTwitterStream<F> {
127        #[pin]
128        response: F,
129    }
130}
131
132pin_project! {
133    /// A listener for Twitter Streaming API, yielding JSON strings returned from the API.
134    pub struct TwitterStream<B> {
135        #[pin]
136        inner: Lines<B>,
137    }
138}
139
140/// A set of OAuth client credentials and token credentials used for authorizing requests
141/// to the Streaming API.
142pub type Token<C = String, T = String> = oauth_credentials::Token<C, T>;
143
144impl<B: Body> TwitterStream<B> {
145    /// Creates a `Builder` for `TwitterStream`.
146    pub fn builder<'a, C, A>(token: Token<C, A>) -> Builder<'a, Token<C, A>>
147    where
148        C: AsRef<str>,
149        A: AsRef<str>,
150    {
151        Builder::new(token)
152    }
153}
154
155#[cfg(feature = "hyper")]
156impl crate::hyper::TwitterStream {
157    /// Connect to the filter stream, yielding Tweets from the users specified by `follow` argument.
158    ///
159    /// This is a shorthand for `twitter_stream::Builder::new(token).follow(follow).listen()`.
160    /// For more specific configurations, use [`TwitterStream::builder`] or [`Builder::new`].
161    ///
162    /// # Panics
163    ///
164    /// This will panic if the underlying HTTPS connector failed to initialize.
165    pub fn follow<C, A>(follow: &[u64], token: &Token<C, A>) -> crate::hyper::FutureTwitterStream
166    where
167        C: AsRef<str>,
168        A: AsRef<str>,
169    {
170        Builder::new(token.as_ref()).follow(follow).listen()
171    }
172
173    /// Connect to the filter stream, yielding Tweets that matches the query specified by
174    /// `track` argument.
175    ///
176    /// This is a shorthand for `twitter_stream::Builder::new(token).track(track).listen()`.
177    /// For more specific configurations, use [`TwitterStream::builder`] or [`Builder::new`].
178    ///
179    /// # Panics
180    ///
181    /// This will panic if the underlying HTTPS connector failed to initialize.
182    pub fn track<C, A>(track: &str, token: &Token<C, A>) -> crate::hyper::FutureTwitterStream
183    where
184        C: AsRef<str>,
185        A: AsRef<str>,
186    {
187        Builder::new(token.as_ref()).track(track).listen()
188    }
189
190    /// Connect to the filter stream, yielding geolocated Tweets falling within the specified
191    /// bounding boxes.
192    ///
193    /// This is a shorthand for `twitter_stream::Builder::new(token).locations(locations).listen()`.
194    /// For more specific configurations, use [`TwitterStream::builder`] or [`Builder::new`].
195    ///
196    /// # Panics
197    ///
198    /// This will panic if the underlying HTTPS connector failed to initialize.
199    pub fn locations<C, A>(
200        locations: &[builder::BoundingBox],
201        token: &Token<C, A>,
202    ) -> crate::hyper::FutureTwitterStream
203    where
204        C: AsRef<str>,
205        A: AsRef<str>,
206    {
207        Builder::new(token.as_ref()).locations(locations).listen()
208    }
209
210    /// Connect to the sample stream, yielding a "small random sample" of all public Tweets.
211    ///
212    /// This is a shorthand for `twitter_stream::Builder::new(token).listen()`.
213    /// For more specific configurations, use [`TwitterStream::builder`] or [`Builder::new`].
214    ///
215    /// # Panics
216    ///
217    /// This will panic if the underlying HTTPS connector failed to initialize.
218    pub fn sample<C, A>(token: &Token<C, A>) -> crate::hyper::FutureTwitterStream
219    where
220        C: AsRef<str>,
221        A: AsRef<str>,
222    {
223        Builder::new(token.as_ref()).listen()
224    }
225}
226
227impl<F, B, E> Future for FutureTwitterStream<F>
228where
229    F: Future<Output = Result<Response<B>, E>>,
230    B: Body,
231{
232    type Output = Result<TwitterStream<B>, Error<E>>;
233
234    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
235        let res = ready!(self.project().response.poll(cx).map_err(Error::Service)?);
236
237        if res.status() != StatusCode::OK {
238            return Poll::Ready(Err(Error::Http(res.status())));
239        }
240
241        let inner = Lines::new(res.into_body());
242
243        Poll::Ready(Ok(TwitterStream { inner }))
244    }
245}
246
247impl<B> Stream for TwitterStream<B>
248where
249    B: Body,
250{
251    type Item = Result<string::String<Bytes>, Error<B::Error>>;
252
253    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254        let mut this = self.project();
255
256        loop {
257            let line = match ready!(this.inner.as_mut().poll_next(cx)?) {
258                Some(t) => t,
259                None => return std::task::Poll::Ready(None),
260            };
261
262            if line.iter().all(|&c| is_json_whitespace(c)) {
263                continue;
264            }
265
266            str::from_utf8(&line).map_err(Error::Utf8)?;
267            let line = unsafe {
268                // Safety:
269                // - We have checked above that `line` is valid as UTF-8.
270                // - `Bytes` satisfies the requirements of `string::StableAsRef` trait
271                // (https://github.com/carllerche/string/pull/17)
272                string::String::<Bytes>::from_utf8_unchecked(line)
273            };
274            return Poll::Ready(Some(Ok(line)));
275        }
276    }
277}
278
279fn is_json_whitespace(c: u8) -> bool {
280    // RFC7159 ยง2
281    b" \t\n\r".contains(&c)
282}