#![doc(html_root_url = "https://docs.rs/twitter-stream/0.13.0")]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(broken_intra_doc_links)]
#![warn(missing_docs)]
#[macro_use]
mod util;
pub mod builder;
pub mod error;
#[cfg(feature = "hyper")]
#[cfg_attr(docsrs, doc(cfg(feature = "hyper")))]
pub mod hyper;
pub mod service;
#[doc(no_inline)]
pub use oauth_credentials::Credentials;
pub use crate::builder::Builder;
pub use crate::error::Error;
use std::future::Future;
use std::pin::Pin;
use std::str;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures_core::{ready, Stream};
use http::Response;
use http::StatusCode;
use http_body::Body;
use pin_project_lite::pin_project;
use crate::util::Lines;
pin_project! {
pub struct FutureTwitterStream<F> {
#[pin]
response: F,
}
}
pin_project! {
pub struct TwitterStream<B> {
#[pin]
inner: Lines<B>,
}
}
pub type Token<C = String, T = String> = oauth_credentials::Token<C, T>;
impl<B: Body> TwitterStream<B> {
pub fn builder<'a, C, A>(token: Token<C, A>) -> Builder<'a, Token<C, A>>
where
C: AsRef<str>,
A: AsRef<str>,
{
Builder::new(token)
}
}
#[cfg(feature = "hyper")]
impl crate::hyper::TwitterStream {
pub fn follow<C, A>(follow: &[u64], token: &Token<C, A>) -> crate::hyper::FutureTwitterStream
where
C: AsRef<str>,
A: AsRef<str>,
{
Builder::new(token.as_ref()).follow(follow).listen()
}
pub fn track<C, A>(track: &str, token: &Token<C, A>) -> crate::hyper::FutureTwitterStream
where
C: AsRef<str>,
A: AsRef<str>,
{
Builder::new(token.as_ref()).track(track).listen()
}
pub fn locations<C, A>(
locations: &[builder::BoundingBox],
token: &Token<C, A>,
) -> crate::hyper::FutureTwitterStream
where
C: AsRef<str>,
A: AsRef<str>,
{
Builder::new(token.as_ref()).locations(locations).listen()
}
pub fn sample<C, A>(token: &Token<C, A>) -> crate::hyper::FutureTwitterStream
where
C: AsRef<str>,
A: AsRef<str>,
{
Builder::new(token.as_ref()).listen()
}
}
impl<F, B, E> Future for FutureTwitterStream<F>
where
F: Future<Output = Result<Response<B>, E>>,
B: Body,
{
type Output = Result<TwitterStream<B>, Error<E>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = ready!(self.project().response.poll(cx).map_err(Error::Service)?);
if res.status() != StatusCode::OK {
return Poll::Ready(Err(Error::Http(res.status())));
}
let inner = Lines::new(res.into_body());
Poll::Ready(Ok(TwitterStream { inner }))
}
}
impl<B> Stream for TwitterStream<B>
where
B: Body,
{
type Item = Result<string::String<Bytes>, Error<B::Error>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
let line = match ready!(this.inner.as_mut().poll_next(cx)?) {
Some(t) => t,
None => return std::task::Poll::Ready(None),
};
if line.iter().all(|&c| is_json_whitespace(c)) {
continue;
}
str::from_utf8(&line).map_err(Error::Utf8)?;
let line = unsafe {
string::String::<Bytes>::from_utf8_unchecked(line)
};
return Poll::Ready(Some(Ok(line)));
}
}
}
fn is_json_whitespace(c: u8) -> bool {
b" \t\n\r".contains(&c)
}