#![doc(html_root_url = "https://docs.rs/twitter-stream/0.9.0")]
#![recursion_limit = "128"]
extern crate bytes;
extern crate cfg_if;
extern crate futures;
extern crate http;
extern crate hyper;
#[cfg(feature = "tls")]
extern crate hyper_tls;
extern crate libflate;
extern crate oauth1_request as oauth;
extern crate oauth1_request_derive;
#[cfg(feature = "serde")]
extern crate serde;
extern crate static_assertions;
extern crate string;
extern crate tokio_timer;
#[macro_use]
mod util;
pub mod error;
#[cfg(feature = "runtime")]
pub mod rt;
pub mod types;
mod gzip;
mod token;
pub use error::Error;
pub use token::Token;
use std::borrow::Borrow;
use std::time::Duration;
use bytes::Bytes;
use futures::{try_ready, Future, Poll, Stream};
use http::response::Parts;
use hyper::body::{Body, Payload};
use hyper::client::connect::Connect;
use hyper::client::{Client, ResponseFuture};
use hyper::header::{
HeaderValue, ACCEPT_ENCODING, AUTHORIZATION, CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE,
};
use hyper::Request;
use oauth::OAuth1Authorize;
use oauth1_request_derive::OAuth1Authorize;
use string::TryFrom;
use gzip::MaybeGzip;
use types::{FilterLevel, RequestMethod, StatusCode, Uri};
use util::*;
macro_rules! def_stream {
(
$(#[$builder_attr:meta])*
pub struct $B:ident<$lifetime:tt, $T:ident $(=$TDefault:ty)*> {
$($arg:ident: $a_ty:ty),*;
$($setters:tt)*
}
$(#[$future_stream_attr:meta])*
pub struct $FS:ident {
$($fs_field:ident: $fsf_ty:ty,)*
}
$(#[$stream_attr:meta])*
pub struct $S:ident {
$($s_field:ident: $sf_ty:ty,)*
}
$(
$(#[$constructor_attr:meta])*
-
$(#[$s_constructor_attr:meta])*
pub fn $constructor:ident($Method:ident, $endpoint:expr);
)*
) => {
$(#[$builder_attr])*
pub struct $B<$lifetime, $T $(= $TDefault)*> {
$($arg: $a_ty,)*
inner: BuilderInner<$lifetime>,
}
def_builder_inner! {
$(#[$builder_attr])*
#[derive(OAuth1Authorize)]
struct BuilderInner<$lifetime> { $($setters)* }
}
$(#[$future_stream_attr])*
pub struct $FS {
$($fs_field: $fsf_ty,)*
}
$(#[$stream_attr])*
pub struct $S {
$($s_field: $sf_ty,)*
}
impl<$lifetime, C, A> $B<$lifetime, Token<C, A>>
where
C: Borrow<str>,
A: Borrow<str>,
{
$(
$(#[$constructor_attr])*
pub fn $constructor(token: Token<C, A>) -> Self {
$B::custom(RequestMethod::$Method, Uri::from_static($endpoint), token)
}
)*
pub fn custom(
method: RequestMethod,
endpoint: Uri,
token: Token<C, A>,
) -> Self
{
$B {
method,
endpoint,
token,
inner: BuilderInner::new(),
}
}
#[cfg(feature = "tls")]
pub fn listen(&self) -> Result<$FS, error::TlsError> {
let conn = hyper_tls::HttpsConnector::new(1)?;
Ok(self.listen_with_client(&Client::builder().build::<_, Body>(conn)))
}
pub fn listen_with_client<Conn, B>(&self, client: &Client<Conn, B>) -> $FS
where
Conn: Connect + Sync + 'static,
Conn::Transport: 'static,
Conn::Future: 'static,
B: Default + From<Vec<u8>> + Payload + Send + 'static,
B::Data: Send,
{
self.listen_with_client_(client)
}
}
impl<$lifetime, C, A> $B<$lifetime, Token<C, A>> {
pub fn method(&mut self, method: RequestMethod) -> &mut Self {
self.method = method;
self
}
pub fn endpoint(&mut self, endpoint: Uri) -> &mut Self {
self.endpoint = endpoint;
self
}
pub fn token(&mut self, token: Token<C, A>) -> &mut Self {
self.token = token;
self
}
def_setters! { $($setters)* }
}
#[cfg(feature = "tls")]
impl $S {
$(
$(#[$s_constructor_attr])*
pub fn $constructor<C, A>(token: Token<C, A>) -> Result<$FS, error::TlsError>
where
C: Borrow<str>,
A: Borrow<str>,
{
$B::$constructor(token).listen()
}
)*
}
};
}
macro_rules! def_builder_inner {
(
$(#[$attr:meta])*
struct $BI:ident<$lifetime:tt> {
$($(#[$field_attr:meta])* $field:ident: $t:ty = $default:expr,)*
}
) => {
$(#[$attr])*
struct $BI<$lifetime> { $($(#[$field_attr])* $field: $t),* }
impl<'a> $BI<'a> { fn new() -> Self { $BI { $($field: $default),* } } }
}
}
macro_rules! def_setters {
(@parse $(#[$attrs:meta])*; #[oauth1($($_ignored:tt)*)] $($rest:tt)*) => {
def_setters! { @parse $(#[$attrs])*; $($rest)* }
};
(@parse $(#[$attrs:meta])*; #[$attr:meta] $($rest:tt)*) => {
def_setters! { @parse $(#[$attrs])* #[$attr]; $($rest)* }
};
(@parse $(#[$attr:meta])*; $setter:ident: Option<$t:ty> = $_default:expr, $($rest:tt)*) => {
$(#[$attr])*
pub fn $setter(&mut self, $setter: impl Into<Option<$t>>) -> &mut Self {
self.inner.$setter = $setter.into();
self
}
def_setters! { $($rest)* }
};
(@parse $(#[$attr:meta])*; $setter:ident: $t:ty = $_default:expr, $($rest:tt)*) => {
$(#[$attr])*
pub fn $setter(&mut self, $setter: $t) -> &mut Self {
self.inner.$setter = $setter;
self
}
def_setters! { $($rest)* }
};
(@parse $($rest:tt)*) => {
compile_error!(concat!("invalid macro call: ", stringify!({ @parse $($rest)* })));
};
($($body:tt)+) => {
def_setters! { @parse; $($body)* }
};
() => {};
}
def_stream! {
#[derive(Clone, Debug)]
pub struct TwitterStreamBuilder<'a, T = Token> {
method: RequestMethod,
endpoint: Uri,
token: T;
#[oauth1(skip)]
timeout: Option<Duration> = Some(Duration::from_secs(90)),
#[oauth1(skip_if = "not")]
stall_warnings: bool = false,
#[oauth1(option)]
filter_level: Option<FilterLevel> = None,
#[oauth1(option)]
language: Option<&'a str> = None,
#[oauth1(option, encoded, fmt = "fmt_follow")]
follow: Option<&'a [u64]> = None,
#[oauth1(option)]
track: Option<&'a str> = None,
#[oauth1(encoded, option, fmt = "fmt_locations")]
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
locations: Option<&'a [((f64, f64), (f64, f64))]> = None,
#[oauth1(encoded, option)]
count: Option<i32> = None,
}
pub struct FutureTwitterStream {
response: MaybeTimeout<ResponseFuture>,
}
pub struct TwitterStream {
inner: Lines<MaybeGzip<MaybeTimeoutStream<Body>>>,
}
-
pub fn filter(POST, "https://stream.twitter.com/1.1/statuses/filter.json");
-
pub fn sample(GET, "https://stream.twitter.com/1.1/statuses/sample.json");
}
impl<'a, C, A> TwitterStreamBuilder<'a, Token<C, A>>
where
C: Borrow<str>,
A: Borrow<str>,
{
fn listen_with_client_<Conn, B>(&self, c: &Client<Conn, B>) -> FutureTwitterStream
where
Conn: Connect + Sync + 'static,
Conn::Transport: 'static,
Conn::Future: 'static,
B: Default + From<Vec<u8>> + Payload + Send + 'static,
B::Data: Send,
{
let mut req = Request::builder();
req.method(self.method.clone())
.header(ACCEPT_ENCODING, HeaderValue::from_static("chunked,gzip"));
let req = if RequestMethod::POST == self.method {
let oauth::Request {
authorization,
data,
} = self.inner.authorize_form(
"POST",
&self.endpoint,
self.token.consumer_key.borrow(),
self.token.consumer_secret.borrow(),
self.token.access_secret.borrow(),
oauth::HmacSha1,
&*oauth::Options::new().token(self.token.access_key.borrow()),
);
req.uri(self.endpoint.clone())
.header(AUTHORIZATION, Bytes::from(authorization))
.header(
CONTENT_TYPE,
HeaderValue::from_static("application/x-www-form-urlencoded"),
).header(CONTENT_LENGTH, Bytes::from(data.len().to_string()))
.body(data.into_bytes().into())
.unwrap()
} else {
let oauth::Request {
authorization,
data: uri,
} = self.inner.authorize(
self.method.as_ref(),
&self.endpoint,
self.token.consumer_key.borrow(),
self.token.consumer_secret.borrow(),
self.token.access_secret.borrow(),
oauth::HmacSha1,
&*oauth::Options::new().token(self.token.access_key.borrow()),
);;
req.uri(uri)
.header(AUTHORIZATION, Bytes::from(authorization))
.body(B::default())
.unwrap()
};
let res = c.request(req);
FutureTwitterStream {
response: timeout(res, self.inner.timeout),
}
}
}
impl Future for FutureTwitterStream {
type Item = TwitterStream;
type Error = Error;
fn poll(&mut self) -> Poll<TwitterStream, Error> {
let res = try_ready!(self.response.poll());
let (
Parts {
status, headers, ..
},
body,
) = res.into_parts();
if StatusCode::OK != status {
return Err(Error::Http(status));
}
let body = timeout_to_stream(&self.response, body);
let use_gzip = headers
.get_all(CONTENT_ENCODING)
.iter()
.any(|e| e == "gzip");
let inner = if use_gzip {
Lines::new(MaybeGzip::gzip(body))
} else {
Lines::new(MaybeGzip::identity(body))
};
Ok(TwitterStream { inner }.into())
}
}
impl Stream for TwitterStream {
type Item = string::String<Bytes>;
type Error = Error;
fn poll(&mut self) -> Poll<Option<string::String<Bytes>>, Error> {
loop {
match try_ready!(self.inner.poll()) {
Some(line) => {
let all_ws = line
.iter()
.all(|&c| c == b'\n' || c == b'\r' || c == b' ' || c == b'\t');
if !all_ws {
let line = string::String::<Bytes>::try_from(line).map_err(Error::Utf8)?;
return Ok(Some(line).into());
}
}
None => return Ok(None.into()),
}
}
}
}