use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use std::{self, io};
use futures::Stream;
use hyper::client::ResponseFuture;
use hyper::{Body, Request};
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use serde_json;
use crate::auth::Token;
use crate::common::*;
use crate::tweet::Tweet;
use crate::{error, links};
#[derive(Debug)]
pub enum StreamMessage {
Ping,
FriendList(Vec<u64>),
Tweet(Tweet),
Delete {
status_id: u64,
user_id: u64,
},
ScrubGeo {
user_id: u64,
up_to_status_id: u64,
},
StatusWithheld {
status_id: u64,
user_id: u64,
withheld_in_countries: Vec<String>,
},
UserWithheld {
user_id: u64,
withheld_in_countries: Vec<String>,
},
Disconnect(u64, String),
Unknown(serde_json::Value),
}
impl<'de> Deserialize<'de> for StreamMessage {
fn deserialize<D>(deser: D) -> Result<StreamMessage, D::Error>
where
D: Deserializer<'de>,
{
macro_rules! fetch {
($input: ident, $key: expr) => {
$input
.get($key)
.and_then(|val| serde_json::from_value(val.clone()).ok())
.ok_or_else(|| D::Error::custom("Failed"))
};
}
let input = serde_json::Value::deserialize(deser)?;
let msg = if let Some(del) = input.get("delete").and_then(|d| d.get("status")) {
StreamMessage::Delete {
status_id: fetch!(del, "id")?,
user_id: fetch!(del, "user_id")?,
}
} else if let Some(scrub) = input.get("scrub_geo") {
StreamMessage::ScrubGeo {
user_id: fetch!(scrub, "user_id")?,
up_to_status_id: fetch!(scrub, "up_to_status_id")?,
}
} else if let Some(tweet) = input.get("status_withheld") {
StreamMessage::StatusWithheld {
status_id: fetch!(tweet, "id")?,
user_id: fetch!(tweet, "user_id")?,
withheld_in_countries: fetch!(tweet, "withheld_in_countries")?,
}
} else if let Some(user) = input.get("user_withheld") {
StreamMessage::UserWithheld {
user_id: fetch!(user, "id")?,
withheld_in_countries: fetch!(user, "withheld_in_countries")?,
}
} else if let Some(err) = input.get("disconnect") {
StreamMessage::Disconnect(fetch!(err, "code")?, fetch!(err, "reason")?)
} else if let Some(friends) = input.get("friends") {
StreamMessage::FriendList(
serde_json::from_value(friends.clone())
.map_err(|e| D::Error::custom(format!("{}", e)))?,
)
} else if let Ok(tweet) = serde_json::from_value::<Tweet>(input.clone()) {
StreamMessage::Tweet(tweet)
} else {
StreamMessage::Unknown(input.clone())
};
Ok(msg)
}
}
impl FromStr for StreamMessage {
type Err = error::Error;
fn from_str(input: &str) -> Result<Self, error::Error> {
let input = input.trim();
if input.is_empty() {
Ok(StreamMessage::Ping)
} else {
Ok(serde_json::from_str(input)?)
}
}
}
#[must_use = "Streams are lazy and do nothing unless polled"]
pub struct TwitterStream {
buf: Vec<u8>,
request: Option<Request<Body>>,
response: Option<ResponseFuture>,
body: Option<Body>,
}
impl TwitterStream {
pub(crate) fn new(request: Request<Body>) -> TwitterStream {
TwitterStream {
buf: vec![],
request: Some(request),
response: None,
body: None,
}
}
}
impl Stream for TwitterStream {
type Item = Result<StreamMessage, error::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Some(req) = self.request.take() {
self.response = Some(get_response(req));
}
if let Some(mut resp) = self.response.take() {
match Pin::new(&mut resp).poll(cx) {
Poll::Pending => {
self.response = Some(resp);
return Poll::Pending;
}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Ok(resp)) => {
let status = resp.status();
if !status.is_success() {
return Poll::Ready(Some(Err(error::Error::BadStatus(status))));
}
self.body = Some(resp.into_body());
}
}
}
if let Some(mut body) = self.body.take() {
loop {
match Pin::new(&mut body).poll_next(cx) {
Poll::Pending => {
self.body = Some(body);
return Poll::Pending;
}
Poll::Ready(None) => {
return Poll::Ready(None);
}
Poll::Ready(Some(Err(e))) => {
self.body = Some(body);
return Poll::Ready(Some(Err(e.into())));
}
Poll::Ready(Some(Ok(chunk))) => {
self.buf.extend(&*chunk);
if let Some(pos) = self.buf.windows(2).position(|w| w == b"\r\n") {
self.body = Some(body);
let pos = pos + 2;
let resp = if let Ok(msg_str) = std::str::from_utf8(&self.buf[..pos]) {
StreamMessage::from_str(msg_str)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
)
.into())
};
self.buf.drain(..pos);
return Poll::Ready(Some(Ok(resp?)));
}
}
}
}
} else {
Poll::Ready(Some(Err(error::Error::FutureAlreadyCompleted)))
}
}
}
#[derive(Copy, Clone, Debug, Deserialize, Serialize)]
pub enum FilterLevel {
#[serde(rename = "none")]
None,
#[serde(rename = "low")]
Low,
#[serde(rename = "medium")]
Medium,
}
impl ::std::fmt::Display for FilterLevel {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
match *self {
FilterLevel::None => write!(f, "none"),
FilterLevel::Low => write!(f, "low"),
FilterLevel::Medium => write!(f, "medium"),
}
}
}
pub struct StreamBuilder {
url: &'static str,
follow: Vec<u64>,
track: Vec<String>,
language: Vec<String>,
locations: Vec<BoundingBox>,
filter_level: Option<FilterLevel>,
}
impl StreamBuilder {
fn new(url: &'static str) -> Self {
StreamBuilder {
url,
follow: Vec::new(),
track: Vec::new(),
language: Vec::new(),
locations: Vec::new(),
filter_level: None,
}
}
pub fn follow(mut self, to_follow: &[u64]) -> Self {
self.follow.extend(to_follow.iter());
self
}
pub fn track<I: IntoIterator<Item = S>, S: AsRef<str>>(mut self, to_track: I) -> Self {
self.track
.extend(to_track.into_iter().map(|s| s.as_ref().to_string()));
self
}
pub fn language<I: IntoIterator<Item = S>, S: AsRef<str>>(mut self, languages: I) -> Self {
self.language
.extend(languages.into_iter().map(|s| s.as_ref().to_string()));
self
}
pub fn locations(mut self, locations: &[BoundingBox]) -> Self {
self.locations.extend(locations.iter());
self
}
pub fn filter_level(self, filter_level: FilterLevel) -> StreamBuilder {
StreamBuilder {
filter_level: Some(filter_level),
..self
}
}
pub fn start(self, token: &Token) -> TwitterStream {
let mut params =
ParamList::new().add_opt_param("filter_level", self.filter_level.map_string());
if !self.follow.is_empty() {
let to_follow = self
.follow
.iter()
.map(|id| id.to_string())
.collect::<Vec<String>>()
.join(",");
params.add_param_ref("follow", to_follow);
}
if !self.track.is_empty() {
let to_track = self.track.join(",");
params.add_param_ref("track", to_track);
}
if !self.language.is_empty() {
let langs = self.language.join(",");
params.add_param_ref("language", langs);
}
if !self.locations.is_empty() {
let locs = self
.locations
.iter()
.map(|bb| bb.to_string())
.collect::<Vec<String>>()
.join(",");
params.add_param_ref("locations", locs);
}
let req = post(self.url, token, Some(¶ms));
TwitterStream::new(req)
}
}
pub fn filter() -> StreamBuilder {
StreamBuilder::new(links::stream::FILTER)
}
pub fn sample(token: &Token) -> TwitterStream {
let req = get(links::stream::SAMPLE, token, None);
TwitterStream::new(req)
}
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
pub struct BoundingBox {
southwest: (f64, f64),
northeast: (f64, f64),
}
impl ::std::fmt::Display for BoundingBox {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
write!(
f,
"{},{},{},{}",
self.southwest.0, self.southwest.1, self.northeast.0, self.northeast.1
)
}
}
impl BoundingBox {
pub fn new(southwest: (f64, f64), northeast: (f64, f64)) -> BoundingBox {
BoundingBox {
southwest,
northeast,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::tests::load_file;
fn load_stream(path: &str) -> StreamMessage {
let sample = load_file(path);
::serde_json::from_str(&sample).unwrap()
}
#[test]
fn parse_tweet_stream() {
let msg = load_stream("sample_payloads/sample-stream.json");
if let StreamMessage::Tweet(_tweet) = msg {
} else {
panic!("Not a tweet")
}
}
#[test]
fn parse_empty_stream() {
let msg = StreamMessage::from_str("").unwrap();
if let StreamMessage::Ping = msg {
} else {
panic!("Not a ping")
}
}
}