extern crate bytes;
#[macro_use]
extern crate futures;
extern crate hyper;
#[macro_use]
extern crate lazy_static;
extern crate oauthcli;
#[cfg(feature = "use-serde")]
extern crate serde;
#[cfg(feature = "use-serde")]
#[macro_use]
extern crate serde_derive;
extern crate tokio_core;
#[cfg(feature = "parse")]
extern crate twitter_stream_message;
extern crate url;
#[macro_use]
mod util;
pub mod error;
#[cfg(feature = "parse")]
pub mod message;
pub mod types;
mod auth;
pub use auth::Token;
pub use error::Error;
use error::HyperError;
use futures::{Future, Poll, Stream};
use hyper::Body;
use hyper::client::{Client, Connect, FutureResponse, Request};
use hyper::header::{Headers, ContentLength, ContentType, UserAgent};
use std::ops::Deref;
use std::borrow::Cow;
use std::time::Duration;
use tokio_core::reactor::Handle;
use types::{FilterLevel, JsonStr, RequestMethod, StatusCode, Url, With};
use url::form_urlencoded::{Serializer, Target};
use util::{BaseTimeout, Lines, TimeoutStream};
macro_rules! def_stream {
(
$(#[$builder_attr:meta])*
pub struct $B:ident<$lifetime:tt, $CH:ident> {
$client_or_handle:ident: $ch_ty:ty = $ch_default:expr;
$(
$(#[$arg_setter_attr:meta])*
:$arg:ident: $a_ty:ty
),*;
$(
$(#[$setter_attr:meta])*
:$setter:ident: $s_ty:ty = $default:expr
),*;
$(:$custom_setter:ident: $c_ty:ty = $c_default:expr),*;
}
$(#[$future_stream_attr:meta])*
pub struct $FS:ident {
$($fs_field:ident: $fsf_ty:ty,)*
}
$(#[$stream_attr:meta])*
pub struct $S:ident {
$($s_field:ident: $sf_ty:ty,)*
}
$(
$(#[$constructor_attr:meta])*
-
$(#[$s_constructor_attr:meta])*
pub fn $constructor:ident($Method:ident, $end_point:expr);
)*
) => {
$(#[$builder_attr])*
pub struct $B<$lifetime, $CH: 'a> {
$client_or_handle: $ch_ty,
$($arg: $a_ty,)*
$($setter: $s_ty,)*
$($custom_setter: $c_ty,)*
}
$(#[$future_stream_attr])*
pub struct $FS {
$($fs_field: $fsf_ty,)*
}
$(#[$stream_attr])*
pub struct $S {
$($s_field: $sf_ty,)*
}
impl<$lifetime> $B<$lifetime, ()> {
$(
$(#[$constructor_attr])*
pub fn $constructor(token: &$lifetime Token<$lifetime>) -> $B<$lifetime, ()> {
$B::custom(RequestMethod::$Method, $end_point.deref(), token)
}
)*
pub fn custom($($arg: $a_ty),*) -> $B<$lifetime, ()> {
$B {
$client_or_handle: $ch_default,
$($arg: $arg,)*
$($setter: $default,)*
$($custom_setter: $c_default,)*
}
}
}
impl<$lifetime, $CH> $B<$lifetime, $CH> {
pub fn client<C, B>(self, client: &$lifetime Client<C, B>) -> $B<$lifetime, Client<C, B>>
where C: Connect, B: From<Vec<u8>> + Stream<Error=HyperError> + 'static, B::Item: AsRef<[u8]>
{
$B {
$client_or_handle: client,
$($arg: self.$arg,)*
$($setter: self.$setter,)*
$($custom_setter: self.$custom_setter,)*
}
}
pub fn handle(self, handle: &$lifetime Handle) -> $B<$lifetime, Handle> {
$B {
$client_or_handle: handle,
$($arg: self.$arg,)*
$($setter: self.$setter,)*
$($custom_setter: self.$custom_setter,)*
}
}
$(
$(#[$arg_setter_attr])*
pub fn $arg(&mut self, $arg: $a_ty) -> &mut Self {
self.$arg = $arg;
self
}
)*
pub fn user_agent<T>(&mut self, user_agent: Option<T>) -> &mut Self where T: Into<Cow<'static, str>> {
self.user_agent = user_agent.map(Into::into);
self
}
$(
$(#[$setter_attr])*
pub fn $setter(&mut self, $setter: $s_ty) -> &mut Self {
self.$setter = $setter;
self
}
)*
}
impl $S {
$(
$(#[$s_constructor_attr])*
pub fn $constructor(token: &Token, handle: &Handle) -> $FS {
$B::$constructor(token).handle(handle).listen()
}
)*
}
};
}
lazy_static! {
static ref EP_FILTER: Url = Url::parse("https://stream.twitter.com/1.1/statuses/filter.json").unwrap();
static ref EP_SAMPLE: Url = Url::parse("https://stream.twitter.com/1.1/statuses/sample.json").unwrap();
static ref EP_USER: Url = Url::parse("https://userstream.twitter.com/1.1/user.json").unwrap();
}
const TUPLE_REF: &'static () = &();
def_stream! {
#[derive(Clone, Debug)]
pub struct TwitterStreamBuilder<'a, CH> {
client_or_handle: &'a CH = TUPLE_REF;
:method: RequestMethod,
:end_point: &'a Url,
:token: &'a Token<'a>;
:timeout: Option<Duration> = Some(Duration::from_secs(90)),
:stall_warnings: bool = false,
:filter_level: FilterLevel = FilterLevel::None,
:language: Option<&'a str> = None,
:follow: Option<&'a [u64]> = None,
:track: Option<&'a str> = None,
:locations: Option<&'a [((f64, f64), (f64, f64))]> = None,
:count: Option<i32> = None,
:with: Option<With> = None,
:replies: bool = false;
:user_agent: Option<Cow<'static, str>> = None;
}
pub struct FutureTwitterStream {
inner: FutureTwitterStreamInner,
}
pub struct TwitterStream {
inner: Lines<TimeoutStream<Body>>,
}
-
pub fn filter(Post, EP_FILTER);
-
pub fn sample(Get, EP_SAMPLE);
-
pub fn user(Get, EP_USER);
}
enum FutureTwitterStreamInner {
Ok {
inner: FutureResponse,
timeout: Option<BaseTimeout>,
},
Err(Option<error::TlsError>),
}
impl<'a, C, B> TwitterStreamBuilder<'a, Client<C, B>>
where C: Connect, B: From<Vec<u8>> + Stream<Error=HyperError> + 'static, B::Item: AsRef<[u8]>
{
pub fn listen(&self) -> FutureTwitterStream {
FutureTwitterStream {
inner: FutureTwitterStreamInner::Ok {
inner: self.connect(self.client_or_handle),
timeout: self.timeout.and_then(|dur| BaseTimeout::new(dur, self.client_or_handle.handle().clone())),
},
}
}
}
impl<'a> TwitterStreamBuilder<'a, Handle> {
pub fn listen(&self) -> FutureTwitterStream {
match default_connector::new(self.client_or_handle) {
Ok(c) => FutureTwitterStream {
inner: FutureTwitterStreamInner::Ok {
inner: self.connect(&Client::configure().connector(c).build(self.client_or_handle)),
timeout: self.timeout.and_then(|dur| BaseTimeout::new(dur, self.client_or_handle.clone())),
},
},
Err(e) => FutureTwitterStream {
inner: FutureTwitterStreamInner::Err(Some(e)),
},
}
}
}
impl<'a, _CH> TwitterStreamBuilder<'a, _CH> {
fn connect<C, B>(&self, c: &Client<C, B>) -> FutureResponse
where C: Connect, B: From<Vec<u8>> + Stream<Error=HyperError> + 'static, B::Item: AsRef<[u8]>
{
let mut url = self.end_point.clone();
let mut headers = Headers::new();
if let Some(ref ua) = self.user_agent {
headers.set(UserAgent::new(ua.clone()));
}
if RequestMethod::Post == self.method {
use hyper::mime;
let mut body = Serializer::new(String::new());
self.append_query_pairs(&mut body);
let body = body.finish();
headers.set(auth::create_authorization_header(self.token, &self.method, &url, Some(body.as_ref())));
headers.set(ContentType(mime::APPLICATION_WWW_FORM_URLENCODED));
headers.set(ContentLength(body.len() as u64));
let mut req = Request::new(RequestMethod::Post, url.as_ref().parse().unwrap());
*req.headers_mut() = headers;
req.set_body(body.into_bytes());
c.request(req)
} else {
self.append_query_pairs(&mut url.query_pairs_mut());
headers.set(auth::create_authorization_header(self.token, &self.method, &url, None));
let mut req = Request::new(self.method.clone(), url.as_ref().parse().unwrap());
*req.headers_mut() = headers;
c.request(req)
}
}
fn append_query_pairs<T: Target>(&self, pairs: &mut Serializer<T>) {
if self.stall_warnings {
pairs.append_pair("stall_warnings", "true");
}
if self.filter_level != FilterLevel::None {
pairs.append_pair("filter_level", self.filter_level.as_ref());
}
if let Some(s) = self.language {
pairs.append_pair("language", s);
}
if let Some(ids) = self.follow {
let mut val = String::new();
if let Some(id) = ids.first() {
val = id.to_string();
}
for id in ids.into_iter().skip(1) {
val.push(',');
val.push_str(&id.to_string());
}
pairs.append_pair("follow", &val);
}
if let Some(s) = self.track {
pairs.append_pair("track", s);
}
if let Some(locs) = self.locations {
let mut val = String::new();
macro_rules! push {
($coordinate:expr) => {{
val.push(',');
val.push_str(&$coordinate.to_string());
}};
}
if let Some(&((lon1, lat1), (lon2, lat2))) = locs.first() {
val = lon1.to_string();
push!(lat1);
push!(lon2);
push!(lat2);
}
for &((lon1, lat1), (lon2, lat2)) in locs.into_iter().skip(1) {
push!(lon1);
push!(lat1);
push!(lon2);
push!(lat2);
}
pairs.append_pair("locations", &val);
}
if let Some(n) = self.count {
pairs.append_pair("count", &n.to_string());
}
if let Some(ref w) = self.with {
pairs.append_pair("with", w.as_ref());
}
if self.replies {
pairs.append_pair("replies", "all");
}
}
}
impl Future for FutureTwitterStream {
type Item = TwitterStream;
type Error = Error;
fn poll(&mut self) -> Poll<TwitterStream, Error> {
use futures::Async;
match self.inner {
FutureTwitterStreamInner::Ok { ref mut inner, ref mut timeout } => match inner.poll().map_err(Error::Hyper)?
{
Async::Ready(res) => {
let status = res.status();
if StatusCode::Ok != status {
return Err(Error::Http(status));
}
let body = match timeout.take() {
Some(timeout) => timeout.for_stream(res.body()),
None => TimeoutStream::never(res.body()),
};
Ok(TwitterStream {
inner: Lines::new(body),
}.into())
},
Async::NotReady => {
if let Some(ref mut timeout) = *timeout {
match timeout.timer_mut().poll() {
Ok(Async::Ready(())) => return Err(Error::TimedOut),
Ok(Async::NotReady) => (),
Err(_) => unreachable!(), }
}
Ok(Async::NotReady)
},
},
FutureTwitterStreamInner::Err(ref mut e) => Err(
Error::Tls(e.take().expect("cannot poll FutureTwitterStream twice"))
),
}
}
}
impl Stream for TwitterStream {
type Item = JsonStr;
type Error = Error;
fn poll(&mut self) -> Poll<Option<JsonStr>, Error> {
loop {
match try_ready!(self.inner.poll()) {
Some(line) => {
if ! line.iter().all(|&c| c == b'\n' || c == b'\r' || c == b' ' || c == b'\t') {
let line = JsonStr::from_utf8(line).map_err(Error::Utf8)?;
return Ok(Some(line).into());
}
},
None => return Ok(None.into()),
}
}
}
}
#[cfg(feature = "tls")]
mod default_connector {
extern crate hyper_tls;
extern crate native_tls;
pub use self::native_tls::Error as Error;
use self::hyper_tls::HttpsConnector;
pub fn new(h: &::tokio_core::reactor::Handle) -> Result<HttpsConnector<::hyper::client::HttpConnector>, Error> {
HttpsConnector::new(1, h)
}
}
#[cfg(all(feature = "tls-openssl", not(any(feature = "tls", feature = "tls-rustls"))))]
mod default_connector {
extern crate hyper_openssl;
pub use self::hyper_openssl::openssl::error::ErrorStack as Error;
use self::hyper_openssl::HttpsConnector;
pub fn new(h: &::tokio_core::reactor::Handle) -> Result<HttpsConnector<::hyper::client::HttpConnector>, Error> {
HttpsConnector::new(1, h)
}
}
#[cfg(not(any(feature = "tls", feature = "tls-rustls", feature = "tls-openssl")))]
mod default_connector {
pub use util::Never as Error;
use hyper::client::HttpConnector;
#[cold]
pub fn new(h: &::tokio_core::reactor::Handle) -> Result<HttpConnector, Error> {
Ok(HttpConnector::new(1, h))
}
}