#![doc(html_root_url = "https://docs.rs/twitter-stream/0.10.0-alpha.2")]
#![recursion_limit = "128"]
#[macro_use]
mod util;
pub mod error;
#[cfg(feature = "runtime")]
pub mod rt;
pub mod types;
mod gzip;
mod token;
pub use oauth::Credentials;
pub use crate::error::Error;
pub use crate::token::Token;
use std::borrow::Borrow;
use std::future::Future;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{Context, Poll};
#[cfg(feature = "runtime")]
use std::time::Duration;
use bytes::Bytes;
use futures_core::Stream;
use futures_util::{ready, FutureExt, StreamExt};
use http::response::Parts;
use hyper::body::{Body, Payload};
use hyper::client::connect::Connect;
use hyper::client::{Client, ResponseFuture};
use hyper::header::{
HeaderValue, ACCEPT_ENCODING, AUTHORIZATION, CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE,
};
use hyper::Request;
use string::TryFrom;
use crate::gzip::MaybeGzip;
use crate::types::{FilterLevel, RequestMethod, StatusCode, Uri};
use crate::util::*;
#[derive(Clone, Debug)]
pub struct Builder<'a, T = Token> {
method: RequestMethod,
endpoint: Uri,
token: T,
inner: BuilderInner<'a>,
}
pub struct FutureTwitterStream {
response: MaybeTimeout<ResponseFuture>,
}
pub struct TwitterStream {
inner: Lines<MaybeGzip<MaybeTimeoutStream<Body>>>,
}
#[derive(Clone, Debug, oauth::Authorize)]
struct BuilderInner<'a> {
#[oauth1(skip)]
#[cfg(feature = "runtime")]
timeout: Option<Duration>,
#[oauth1(skip_if = "not")]
stall_warnings: bool,
filter_level: Option<FilterLevel>,
language: Option<&'a str>,
#[oauth1(encoded, fmt = "fmt_follow")]
follow: Option<&'a [u64]>,
track: Option<&'a str>,
#[oauth1(encoded, fmt = "fmt_locations")]
#[allow(clippy::type_complexity)]
locations: Option<&'a [((f64, f64), (f64, f64))]>,
#[oauth1(encoded)]
count: Option<i32>,
}
impl<'a, C, A> Builder<'a, Token<C, A>>
where
C: Borrow<str>,
A: Borrow<str>,
{
pub fn filter(token: Token<C, A>) -> Self {
const URI: &str = "https://stream.twitter.com/1.1/statuses/filter.json";
Self::custom(RequestMethod::POST, Uri::from_static(URI), token)
}
pub fn sample(token: Token<C, A>) -> Self {
const URI: &str = "https://stream.twitter.com/1.1/statuses/sample.json";
Self::custom(RequestMethod::GET, Uri::from_static(URI), token)
}
pub fn custom(method: RequestMethod, endpoint: Uri, token: Token<C, A>) -> Self {
Self {
method,
endpoint,
token,
inner: BuilderInner {
#[cfg(feature = "runtime")]
timeout: Some(Duration::from_secs(90)),
stall_warnings: false,
filter_level: None,
language: None,
follow: None,
track: None,
locations: None,
count: None,
},
}
}
#[cfg(feature = "tls")]
pub fn listen(&self) -> Result<FutureTwitterStream, error::TlsError> {
let conn = hyper_tls::HttpsConnector::new()?;
Ok(self.listen_with_client(&Client::builder().build::<_, Body>(conn)))
}
pub fn listen_with_client<Conn, B>(&self, client: &Client<Conn, B>) -> FutureTwitterStream
where
Conn: Connect + Sync + 'static,
Conn::Transport: 'static,
Conn::Future: 'static,
B: Default + From<Vec<u8>> + Payload + Unpin + Send + 'static,
B::Data: Send + Unpin,
{
let mut req = Request::builder();
req.method(self.method.clone())
.header(ACCEPT_ENCODING, HeaderValue::from_static("gzip"));
let mut oauth = oauth::Builder::new(self.token.client.as_ref(), oauth::HmacSha1);
oauth.token(self.token.token.as_ref());
let req = if RequestMethod::POST == self.method {
let oauth::Request {
authorization,
data,
} = oauth.post_form(&self.endpoint, &self.inner);
req.uri(self.endpoint.clone())
.header(AUTHORIZATION, Bytes::from(authorization))
.header(
CONTENT_TYPE,
HeaderValue::from_static("application/x-www-form-urlencoded"),
)
.header(CONTENT_LENGTH, Bytes::from(data.len().to_string()))
.body(data.into_bytes().into())
.unwrap()
} else {
let oauth::Request {
authorization,
data: uri,
} = oauth.build(self.method.as_ref(), &self.endpoint, &self.inner);
req.uri(uri)
.header(AUTHORIZATION, Bytes::from(authorization))
.body(B::default())
.unwrap()
};
let res = client.request(req);
FutureTwitterStream {
#[cfg(feature = "runtime")]
response: timeout(res, self.inner.timeout),
#[cfg(not(feature = "runtime"))]
response: timeout(res),
}
}
}
impl<'a, C, A> Builder<'a, Token<C, A>> {
pub fn method(&mut self, method: RequestMethod) -> &mut Self {
self.method = method;
self
}
pub fn endpoint(&mut self, endpoint: Uri) -> &mut Self {
self.endpoint = endpoint;
self
}
pub fn token(&mut self, token: Token<C, A>) -> &mut Self {
self.token = token;
self
}
#[cfg(feature = "runtime")]
pub fn timeout(&mut self, timeout: impl Into<Option<Duration>>) -> &mut Self {
self.inner.timeout = timeout.into();
self
}
pub fn stall_warnings(&mut self, stall_warnings: bool) -> &mut Self {
self.inner.stall_warnings = stall_warnings;
self
}
pub fn filter_level(&mut self, filter_level: impl Into<Option<FilterLevel>>) -> &mut Self {
self.inner.filter_level = filter_level.into();
self
}
pub fn language(&mut self, language: impl Into<Option<&'a str>>) -> &mut Self {
self.inner.language = language.into();
self
}
pub fn follow(&mut self, follow: impl Into<Option<&'a [u64]>>) -> &mut Self {
self.inner.follow = follow.into();
self
}
pub fn track(&mut self, track: impl Into<Option<&'a str>>) -> &mut Self {
self.inner.track = track.into();
self
}
pub fn locations(
&mut self,
locations: impl Into<Option<&'a [((f64, f64), (f64, f64))]>>,
) -> &mut Self {
self.inner.locations = locations.into();
self
}
pub fn count(&mut self, count: impl Into<Option<i32>>) -> &mut Self {
self.inner.count = count.into();
self
}
}
#[cfg(feature = "tls")]
impl TwitterStream {
pub fn filter<C, A>(token: Token<C, A>) -> Result<FutureTwitterStream, error::TlsError>
where
C: Borrow<str>,
A: Borrow<str>,
{
Builder::filter(token).listen()
}
pub fn sample<C, A>(token: Token<C, A>) -> Result<FutureTwitterStream, error::TlsError>
where
C: Borrow<str>,
A: Borrow<str>,
{
Builder::sample(token).listen()
}
}
impl Future for FutureTwitterStream {
type Output = Result<TwitterStream, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = ready!(self.response.poll_unpin(cx))?;
let (parts, body) = res.into_parts();
let Parts {
status, headers, ..
} = parts;
if StatusCode::OK != status {
return Poll::Ready(Err(Error::Http(status)));
}
let body = timeout_to_stream(&self.response, body);
let use_gzip = headers
.get_all(CONTENT_ENCODING)
.iter()
.any(|e| e == "gzip");
let inner = if use_gzip {
Lines::new(gzip::gzip(body))
} else {
Lines::new(gzip::identity(body))
};
Poll::Ready(Ok(TwitterStream { inner }))
}
}
impl Stream for TwitterStream {
type Item = Result<string::String<Bytes>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let line = ready_some!(self.inner.poll_next_unpin(cx))?;
if line.iter().all(|&c| is_json_whitespace(c)) {
continue;
}
let line = string::String::<Bytes>::try_from(line).map_err(Error::Utf8)?;
return Poll::Ready(Some(Ok(line)));
}
}
}
fn is_json_whitespace(c: u8) -> bool {
b" \t\n\r".contains(&c)
}