mod event;
mod warning;
pub use self::event::{Event, EventKind};
pub use self::warning::{Warning, WarningCode};
use {DirectMessage};
use serde::de::{Deserialize, Deserializer, Error, IgnoredAny, MapAccess, Unexpected, Visitor};
use std::fmt;
use tweet::{StatusId, Tweet};
use types::{JsonMap, JsonValue};
use user::UserId;
#[derive(Clone, Debug, PartialEq)]
pub enum StreamMessage {
Tweet(Box<Tweet>),
Event(Box<Event>),
Delete(Delete),
ScrubGeo(ScrubGeo),
Limit(Limit),
StatusWithheld(StatusWithheld),
UserWithheld(UserWithheld),
Disconnect(Disconnect),
Warning(Warning),
Friends(Friends),
DirectMessage(Box<DirectMessage>),
Control(Control),
ForUser(UserId, Box<StreamMessage>),
Custom(JsonMap<String, JsonValue>),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub struct Delete {
pub id: StatusId,
pub user_id: UserId,
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Hash)]
pub struct ScrubGeo {
pub user_id: UserId,
pub up_to_status_id: StatusId,
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Hash)]
pub struct Limit {
pub track: u64,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash)]
pub struct StatusWithheld {
pub id: StatusId,
pub user_id: UserId,
pub withheld_in_countries: Vec<String>,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash)]
pub struct UserWithheld {
pub id: UserId,
pub withheld_in_countries: Vec<String>,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash)]
pub struct Disconnect {
pub code: DisconnectCode,
pub stream_name: String,
pub reason: String,
}
macro_rules! number_enum {
(
$(#[$attr:meta])*
pub enum $E:ident {
$(
$(#[$v_attr:meta])*
:$V:ident = $n:expr,
)*
}
) => {
$(#[$attr])*
pub enum $E {
$(
$(#[$v_attr])*
$V = $n,
)*
}
impl<'x> Deserialize<'x> for $E {
fn deserialize<D: Deserializer<'x>>(d: D) -> Result<Self, D::Error> {
struct NEVisitor;
impl<'x> Visitor<'x> for NEVisitor {
type Value = $E;
fn visit_u64<E: Error>(self, v: u64) -> Result<$E, E> {
match v {
$($n => Ok($E::$V),)*
_ => Err(E::invalid_value(Unexpected::Unsigned(v), &self)),
}
}
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, concat!("one of the following integers: ", $($n, ','),*))
}
}
d.deserialize_u64(NEVisitor)
}
}
impl AsRef<str> for $E {
fn as_ref(&self) -> &str {
match *self {
$($E::$V => stringify!($V),)*
}
}
}
};
}
number_enum! {
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub enum DisconnectCode {
:Shutdown = 1,
:DuplicateStream = 2,
:ControlRequest = 3,
:Stall = 4,
:Normal = 5,
:TokenRevoked = 6,
:AdminLogout = 7,
:MaxMessageLimit = 9,
:StreamException = 10,
:BrokerStall = 11,
:ShedLoad = 12,
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Hash)]
pub struct Control {
control_uri: String,
}
pub type Friends = Vec<UserId>;
impl<'x> Deserialize<'x> for StreamMessage {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> where D: Deserializer<'x> {
struct SMVisitor;
impl<'x> Visitor<'x> for SMVisitor {
type Value = StreamMessage;
fn visit_map<V>(self, mut v: V) -> Result<StreamMessage, V::Error> where V: MapAccess<'x> {
let key = match v.next_key::<String>()? {
Some(k) => k,
None => return Ok(StreamMessage::Custom(JsonMap::new())),
};
let ret = match key.as_str() {
"delete" => Some(v.next_value().map(StreamMessage::Delete)),
"scrub_geo" => Some(v.next_value().map(StreamMessage::ScrubGeo)),
"limit" => Some(v.next_value().map(StreamMessage::Limit)),
"status_withheld" => Some(v.next_value().map(StreamMessage::StatusWithheld)),
"user_withheld" => Some(v.next_value().map(StreamMessage::UserWithheld)),
"disconnect" => Some(v.next_value().map(StreamMessage::Disconnect)),
"warning" => Some(v.next_value().map(StreamMessage::Warning)),
"friends" => Some(v.next_value().map(StreamMessage::Friends)),
"direct_message" => Some(v.next_value().map(StreamMessage::DirectMessage)),
"control" => Some(v.next_value().map(StreamMessage::Control)),
_ => None,
};
if let Some(ret) = ret {
if ret.is_ok() {
while v.next_entry::<IgnoredAny,IgnoredAny>()?.is_some() {}
}
return ret;
}
let mut map = JsonMap::new();
map.insert(key, v.next_value()?);
while let Some((k, v)) = v.next_entry()? {
map.insert(k, v);
}
if map.contains_key("id") {
Tweet::deserialize(JsonValue::Object(map))
.map(Box::new)
.map(StreamMessage::Tweet)
.map_err(|e| V::Error::custom(e.to_string()))
} else if map.contains_key("event") {
Event::deserialize(JsonValue::Object(map))
.map(Box::new)
.map(StreamMessage::Event)
.map_err(|e| V::Error::custom(e.to_string()))
} else if let Some(id) = map.remove("for_user") {
if let JsonValue::Number(id) = id {
if let Some(id) = id.as_u64() {
if let Some(msg) = map.remove("message") {
StreamMessage::deserialize(msg)
.map(|m| StreamMessage::ForUser(id, Box::new(m)))
.map_err(|e| V::Error::custom(e.to_string()))
} else {
Err(V::Error::missing_field("message"))
}
} else {
Err(V::Error::custom("expected u64"))
}
} else {
Err(V::Error::custom("expected u64"))
}
} else {
Ok(StreamMessage::Custom(map))
}
}
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "map")
}
}
deserializer.deserialize_map(SMVisitor)
}
}
impl<'x> Deserialize<'x> for Delete {
fn deserialize<D: Deserializer<'x>>(d: D) -> Result<Self, D::Error> {
struct DeleteVisitor;
impl<'x> Visitor<'x> for DeleteVisitor {
type Value = Delete;
fn visit_map<V: MapAccess<'x>>(self, mut v: V) -> Result<Delete, V::Error> {
use std::mem;
#[allow(dead_code)]
#[derive(Deserialize)]
struct Status { id: StatusId, user_id: UserId };
while let Some(k) = v.next_key::<String>()? {
if "status" == k.as_str() {
let ret = v.next_value::<Status>()?;
while v.next_entry::<IgnoredAny,IgnoredAny>()?.is_some() {}
unsafe {
return Ok(mem::transmute(ret));
}
} else {
v.next_value::<IgnoredAny>()?;
}
}
Err(V::Error::missing_field("status"))
}
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "a map with a field `status` which contains field `id` and `user_id` of integer type`")
}
}
d.deserialize_map(DeleteVisitor)
}
}
impl fmt::Display for Disconnect {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}: {} {}: {}", self.stream_name, self.code as u32, self.code.as_ref(), self.reason)
}
}
#[cfg(test)]
mod tests {
use json;
use super::*;
#[test]
fn warning() {
assert_eq!(
StreamMessage::Warning(Warning {
message: "Your connection is falling behind and messages are being queued for delivery to you. \
Your queue is now over 60% full. You will be disconnected when the queue is full.".to_owned(),
code: WarningCode::FallingBehind(60),
}),
json::from_str(
"{\"warning\":{\"code\":\"FALLING_BEHIND\",\"message\":\"Your connection is falling \
behind and messages are being queued for delivery to you. Your queue is now over 60% full. \
You will be disconnected when the queue is full.\",\"percent_full\": 60}}"
).unwrap()
)
}
}