twitter_stream/lib.rs
1#![doc(html_root_url = "https://docs.rs/twitter-stream/0.13.0")]
2
3/*!
4# Twitter Stream
5
6A library for listening on Twitter Streaming API.
7
8## Usage
9
10Add this to your `Cargo.toml`:
11
12```toml
13[dependencies]
14futures = "0.3"
15tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
16twitter-stream = "0.13"
17```
18
19## Overview
20
21Here is a basic example that prints public mentions to @Twitter in JSON format:
22
23```no_run
24use futures::prelude::*;
25use twitter_stream::{Token, TwitterStream};
26
27# #[tokio::main]
28# async fn main() {
29let token = Token::from_parts("consumer_key", "consumer_secret", "access_key", "access_secret");
30
31TwitterStream::track("@Twitter", &token)
32 .try_flatten_stream()
33 .try_for_each(|json| {
34 println!("{}", json);
35 future::ok(())
36 })
37 .await
38 .unwrap();
39# }
40```
41
42See the [`TwitterStream`] type documentation for details.
43
44## Streaming messages
45
46`TwitterStream` yields the raw JSON strings returned by the Streaming API. Each string value
47contains exactly one JSON value.
48
49The underlying Streaming API [sends a blank line][stalls] every 30 seconds as a "keep-alive" signal,
50but `TwitterStream` discards it so that you can always expect to yield a valid JSON string.
51On the other hand, this means that you cannot use the blank line to set a timeout on `Stream`-level.
52If you want the stream to time out on network stalls, set a timeout on the underlying
53HTTP connector, instead of the `Stream` (see the [`timeout` example] in the crate's repository
54for details).
55
56[stalls]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/connecting#stalls
57[`timeout` example]: https://github.com/tesaguri/twitter-stream-rs/blob/v0.13.0/examples/timeout.rs
58
59The JSON string usually, but not always, represents a [Tweet] object. When deserializing the JSON
60string, you should be able to handle any kind of JSON value. A possible implementation of
61deserialization would be like the following:
62
63[Tweet]: https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object
64
65```
66#[derive(serde::Deserialize)]
67#[serde(untagged)]
68enum StreamMessage {
69 Tweet(Tweet),
70 // Discards anything other than a Tweet.
71 // You can handle other message types as well by adding correspoiding variants.
72 Other(serde::de::IgnoredAny),
73}
74
75#[derive(serde::Deserialize)]
76struct Tweet { /* ... */ }
77```
78
79The [`echo_bot` example] in the crate's repository shows an example of a `StreamMessage`
80implementation.
81
82[`echo_bot` example]: https://github.com/tesaguri/twitter-stream-rs/blob/v0.13.0/examples/echo_bot.rs
83
84See the [Twitter Developers Documentation][message-types] for the types and formats of the JSON
85messages.
86
87[message-types]: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types
88*/
89
90#![cfg_attr(docsrs, feature(doc_cfg))]
91#![deny(broken_intra_doc_links)]
92#![warn(missing_docs)]
93
94#[macro_use]
95mod util;
96
97pub mod builder;
98pub mod error;
99#[cfg(feature = "hyper")]
100#[cfg_attr(docsrs, doc(cfg(feature = "hyper")))]
101pub mod hyper;
102pub mod service;
103
104#[doc(no_inline)]
105pub use oauth_credentials::Credentials;
106
107pub use crate::builder::Builder;
108pub use crate::error::Error;
109
110use std::future::Future;
111use std::pin::Pin;
112use std::str;
113use std::task::{Context, Poll};
114
115use bytes::Bytes;
116use futures_core::{ready, Stream};
117use http::Response;
118use http::StatusCode;
119use http_body::Body;
120use pin_project_lite::pin_project;
121
122use crate::util::Lines;
123
124pin_project! {
125 /// A future returned by constructor methods which resolves to a [`TwitterStream`].
126 pub struct FutureTwitterStream<F> {
127 #[pin]
128 response: F,
129 }
130}
131
132pin_project! {
133 /// A listener for Twitter Streaming API, yielding JSON strings returned from the API.
134 pub struct TwitterStream<B> {
135 #[pin]
136 inner: Lines<B>,
137 }
138}
139
140/// A set of OAuth client credentials and token credentials used for authorizing requests
141/// to the Streaming API.
142pub type Token<C = String, T = String> = oauth_credentials::Token<C, T>;
143
144impl<B: Body> TwitterStream<B> {
145 /// Creates a `Builder` for `TwitterStream`.
146 pub fn builder<'a, C, A>(token: Token<C, A>) -> Builder<'a, Token<C, A>>
147 where
148 C: AsRef<str>,
149 A: AsRef<str>,
150 {
151 Builder::new(token)
152 }
153}
154
155#[cfg(feature = "hyper")]
156impl crate::hyper::TwitterStream {
157 /// Connect to the filter stream, yielding Tweets from the users specified by `follow` argument.
158 ///
159 /// This is a shorthand for `twitter_stream::Builder::new(token).follow(follow).listen()`.
160 /// For more specific configurations, use [`TwitterStream::builder`] or [`Builder::new`].
161 ///
162 /// # Panics
163 ///
164 /// This will panic if the underlying HTTPS connector failed to initialize.
165 pub fn follow<C, A>(follow: &[u64], token: &Token<C, A>) -> crate::hyper::FutureTwitterStream
166 where
167 C: AsRef<str>,
168 A: AsRef<str>,
169 {
170 Builder::new(token.as_ref()).follow(follow).listen()
171 }
172
173 /// Connect to the filter stream, yielding Tweets that matches the query specified by
174 /// `track` argument.
175 ///
176 /// This is a shorthand for `twitter_stream::Builder::new(token).track(track).listen()`.
177 /// For more specific configurations, use [`TwitterStream::builder`] or [`Builder::new`].
178 ///
179 /// # Panics
180 ///
181 /// This will panic if the underlying HTTPS connector failed to initialize.
182 pub fn track<C, A>(track: &str, token: &Token<C, A>) -> crate::hyper::FutureTwitterStream
183 where
184 C: AsRef<str>,
185 A: AsRef<str>,
186 {
187 Builder::new(token.as_ref()).track(track).listen()
188 }
189
190 /// Connect to the filter stream, yielding geolocated Tweets falling within the specified
191 /// bounding boxes.
192 ///
193 /// This is a shorthand for `twitter_stream::Builder::new(token).locations(locations).listen()`.
194 /// For more specific configurations, use [`TwitterStream::builder`] or [`Builder::new`].
195 ///
196 /// # Panics
197 ///
198 /// This will panic if the underlying HTTPS connector failed to initialize.
199 pub fn locations<C, A>(
200 locations: &[builder::BoundingBox],
201 token: &Token<C, A>,
202 ) -> crate::hyper::FutureTwitterStream
203 where
204 C: AsRef<str>,
205 A: AsRef<str>,
206 {
207 Builder::new(token.as_ref()).locations(locations).listen()
208 }
209
210 /// Connect to the sample stream, yielding a "small random sample" of all public Tweets.
211 ///
212 /// This is a shorthand for `twitter_stream::Builder::new(token).listen()`.
213 /// For more specific configurations, use [`TwitterStream::builder`] or [`Builder::new`].
214 ///
215 /// # Panics
216 ///
217 /// This will panic if the underlying HTTPS connector failed to initialize.
218 pub fn sample<C, A>(token: &Token<C, A>) -> crate::hyper::FutureTwitterStream
219 where
220 C: AsRef<str>,
221 A: AsRef<str>,
222 {
223 Builder::new(token.as_ref()).listen()
224 }
225}
226
227impl<F, B, E> Future for FutureTwitterStream<F>
228where
229 F: Future<Output = Result<Response<B>, E>>,
230 B: Body,
231{
232 type Output = Result<TwitterStream<B>, Error<E>>;
233
234 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
235 let res = ready!(self.project().response.poll(cx).map_err(Error::Service)?);
236
237 if res.status() != StatusCode::OK {
238 return Poll::Ready(Err(Error::Http(res.status())));
239 }
240
241 let inner = Lines::new(res.into_body());
242
243 Poll::Ready(Ok(TwitterStream { inner }))
244 }
245}
246
247impl<B> Stream for TwitterStream<B>
248where
249 B: Body,
250{
251 type Item = Result<string::String<Bytes>, Error<B::Error>>;
252
253 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254 let mut this = self.project();
255
256 loop {
257 let line = match ready!(this.inner.as_mut().poll_next(cx)?) {
258 Some(t) => t,
259 None => return std::task::Poll::Ready(None),
260 };
261
262 if line.iter().all(|&c| is_json_whitespace(c)) {
263 continue;
264 }
265
266 str::from_utf8(&line).map_err(Error::Utf8)?;
267 let line = unsafe {
268 // Safety:
269 // - We have checked above that `line` is valid as UTF-8.
270 // - `Bytes` satisfies the requirements of `string::StableAsRef` trait
271 // (https://github.com/carllerche/string/pull/17)
272 string::String::<Bytes>::from_utf8_unchecked(line)
273 };
274 return Poll::Ready(Some(Ok(line)));
275 }
276 }
277}
278
279fn is_json_whitespace(c: u8) -> bool {
280 // RFC7159 ยง2
281 b" \t\n\r".contains(&c)
282}