twitter-stream-message 0.3.0

Types for Twitter Streaming API's messages.
Documentation
//! Messages from Streaming API.

mod event;
mod warning;

pub use self::event::{Event, EventKind};
pub use self::warning::{Warning, WarningCode};

use std::borrow::Cow;
use std::fmt;

use serde::de::{
    Deserialize,
    Deserializer,
    Error as SerdeError,
    IgnoredAny,
    MapAccess,
    Unexpected,
    Visitor,
};
use serde::de::value::MapAccessDeserializer;

use DirectMessage;
use tweet::{StatusId, Tweet};
use types::{JsonMap, JsonValue};
use user::UserId;
use util::{CowStr, MapAccessChain};

/// Represents a message from Twitter Streaming API.
///
/// # Reference
///
/// 1. [Streaming message types — Twitter Developers](https://dev.twitter.com/streaming/overview/messages-types)
#[derive(Clone, Debug, PartialEq)]
pub enum StreamMessage<'a> {
    /// Tweet
    Tweet(Box<Tweet<'a>>),

    /// Notifications about non-Tweet events.
    Event(Box<Event<'a>>),

    /// Indicate that a given Tweet has been deleted.
    Delete(Delete),

    /// Indicate that geolocated data must be stripped from a range of Tweets.
    ScrubGeo(ScrubGeo),

    /// Indicate that a filtered stream has matched more Tweets than
    /// its current rate limit allows to be delivered, noticing a total count of
    /// the number of undelivered Tweets since the connection was opened.
    Limit(Limit),

    /// Indicate that a given tweet has had its content withheld.
    StatusWithheld(StatusWithheld<'a>),

    /// Indicate that a user has had their content withheld.
    UserWithheld(UserWithheld<'a>),

    /// This message is sent when a stream is disconnected,
    /// indicating why the stream was closed.
    Disconnect(Disconnect<'a>),

    /// Variout warning message
    Warning(Warning<'a>),

    /// List of the user's friends.
    /// Only be sent upon establishing a User Stream connection.
    Friends(Friends),

    // TODO: deserialize `friends_str` into `Friends`
    // FriendsStr(Vec<String>),

    /// Direct message
    DirectMessage(Box<DirectMessage<'a>>),

    /// A [control URI][1] for Site Streams.
    /// [1]: https://dev.twitter.com/streaming/sitestreams/controlstreams
    Control(Control<'a>),

    /// An [envelope][1] for Site Stream.
    /// [1]: https://dev.twitter.com/streaming/overview/messages-types#envelopes_for_user
    ForUser(UserId, Box<StreamMessage<'a>>),

    // ForUserStr(String, Box<StreamMessage>),

    /// A message not known to this library.
    Custom(JsonMap<String, JsonValue>),
}

/// Represents a deleted Tweet.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub struct Delete {
    pub id: StatusId,
    pub user_id: UserId,
}

/// Represents a range of Tweets whose geolocated data must be stripped.
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Hash)]
pub struct ScrubGeo {
    pub user_id: UserId,
    pub up_to_status_id: StatusId,
}

#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Hash)]
pub struct Limit {
    pub track: u64,
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash)]
pub struct StatusWithheld<'a> {
    pub id: StatusId,
    pub user_id: UserId,
    #[serde(borrow)]
    #[serde(deserialize_with = "::util::deserialize_vec_cow_str")]
    pub withheld_in_countries: Vec<Cow<'a, str>>,
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash)]
pub struct UserWithheld<'a> {
    pub id: UserId,
    #[serde(borrow)]
    #[serde(deserialize_with = "::util::deserialize_vec_cow_str")]
    pub withheld_in_countries: Vec<Cow<'a, str>>,
}

/// Indicates why a stream was closed.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash)]
pub struct Disconnect<'a> {
    pub code: DisconnectCode,

    #[serde(borrow)]
    pub stream_name: Cow<'a, str>,

    #[serde(borrow)]
    pub reason: Cow<'a, str>,
}

macro_rules! number_enum {
    (
        $(#[$attr:meta])*
        pub enum $E:ident {
            $(
                $(#[$v_attr:meta])*
                $V:ident = $n:expr,
            )*
        }
    ) => {
        $(#[$attr])*
        pub enum $E {
            $(
                $(#[$v_attr])*
                $V = $n,
            )*
        }

        impl<'x> Deserialize<'x> for $E {
            fn deserialize<D: Deserializer<'x>>(d: D)
                -> Result<Self, D::Error>
            {
                struct NEVisitor;

                impl<'x> Visitor<'x> for NEVisitor {
                    type Value = $E;

                    fn visit_u64<E: SerdeError>(self, v: u64) -> Result<$E, E> {
                        match v {
                            $($n => Ok($E::$V),)*
                            _ => Err(
                                E::invalid_value(Unexpected::Unsigned(v), &self)
                            ),
                        }
                    }

                    fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
                        write!(f, concat!(
                            "one of the following integers: ", $($n, ','),*)
                        )
                    }
                }

                d.deserialize_u64(NEVisitor)
            }
        }

        impl AsRef<str> for $E {
            fn as_ref(&self) -> &str {
                match *self {
                    $($E::$V => stringify!($V),)*
                }
            }
        }
    };
}

number_enum! {
    /// Status code for a `Disconnect` message.
    #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
    pub enum DisconnectCode {
        /// The feed was shutdown (possibly a machine restart).
        Shutdown = 1,
        /// The same endpoint was connected too many times.
        DuplicateStream = 2,
        /// Control streams was used to close a stream (applies to sitestreams).
        ControlRequest = 3,
        /// The client was reading too slowly and was disconnected by the server.
        Stall = 4,
        /// The client appeared to have initiated a disconnect.
        Normal = 5,
        /// An oauth token was revoked for a user
        /// (applies to site and userstreams).
        TokenRevoked = 6,
        /// The same credentials were used to connect a new stream
        /// and the oldest was disconnected.
        AdminLogout = 7,
        // Reserved for internal use. Will not be delivered to external clients.
        // _ = 8,
        /// The stream connected with a negative count parameter
        /// and was disconnected after all backfill was delivered.
        MaxMessageLimit = 9,
        /// An internal issue disconnected the stream.
        StreamException = 10,
        /// An internal issue disconnected the stream.
        BrokerStall = 11,
        /// The host the stream was connected to became overloaded
        /// and streams were disconnected to balance load. Reconnect as usual.
        ShedLoad = 12,
    }
}

/// Represents a control message.
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Hash)]
pub struct Control<'a> {
    #[serde(borrow)]
    control_uri: Cow<'a, str>,
}

pub type Friends = Vec<UserId>;

impl<'a> StreamMessage<'a> {
    /// Parse a JSON string returned from Twitter Streaming API.
    ///
    /// Note that this method is not a member of the `FromStr` trait. It is
    /// because the method requires the lifetime information of the JSON string,
    /// while `FromStr::from_str` does not take a lifetime parameter.
    ///
    /// ```
    /// use twitter_stream_message::message::{Delete, StreamMessage};
    ///
    /// let parsed = StreamMessage::from_str(r#"{
    ///     "delete":{
    ///         "status":{
    ///             "id":1234,
    ///             "id_str":"1234",
    ///             "user_id":3,
    ///             "user_id_str":"3"
    ///         }
    ///     }
    /// }"#).unwrap();
    /// let expected = StreamMessage::Delete(Delete {
    ///     id: 1234,
    ///     user_id: 3,
    /// });
    ///
    /// assert_eq!(parsed, expected);
    #[cfg_attr(feature = "cargo-clippy", allow(should_implement_trait))]
    pub fn from_str(json: &'a str) -> ::Result<Self> {
        ::json::from_str(json)
    }
}

impl<'de: 'a, 'a> Deserialize<'de> for StreamMessage<'a> {
    fn deserialize<D: Deserializer<'de>>(deserializer: D)
        -> Result<Self, D::Error>
    {
        struct SMVisitor;

        impl<'a> Visitor<'a> for SMVisitor {
            type Value = StreamMessage<'a>;

            fn visit_map<A: MapAccess<'a>>(self, mut a: A)
                -> Result<StreamMessage<'a>, A::Error>
            {
                let mut key = match a.next_key::<CowStr>()? {
                    Some(k) => k,
                    None => return Ok(StreamMessage::Custom(JsonMap::new())),
                };

                let ret = match &*key {
                    "delete" => Some(
                        a.next_value().map(StreamMessage::Delete)
                    ),
                    "scrub_geo" => Some(
                        a.next_value().map(StreamMessage::ScrubGeo)
                    ),
                    "limit" => Some(
                        a.next_value().map(StreamMessage::Limit)
                    ),
                    "status_withheld" => Some(
                        a.next_value().map(StreamMessage::StatusWithheld)
                    ),
                    "user_withheld" => Some(
                        a.next_value().map(StreamMessage::UserWithheld)
                    ),
                    "disconnect" => Some(
                        a.next_value().map(StreamMessage::Disconnect)
                    ),
                    "warning" => Some(
                        a.next_value().map(StreamMessage::Warning)
                    ),
                    "friends" => Some(
                        a.next_value().map(StreamMessage::Friends)
                    ),
                    // "friends_str" => Some(
                    //     a.next_value().map(StreamMessage::Friends)
                    // ),
                    "direct_message" => Some(
                        a.next_value().map(StreamMessage::DirectMessage)
                    ),
                    "control" => Some(
                        a.next_value().map(StreamMessage::Control)
                    ),
                    _ => None,
                };

                if let Some(ret) = ret {
                    if ret.is_ok() {
                        while a.next_entry::<IgnoredAny,IgnoredAny>()?.is_some()
                        {}
                    }
                    return ret;
                }

                // Tweet, Event or for_user envelope:

                let mut keys = Vec::new();
                let mut vals = Vec::new();

                loop {
                    match &*key {
                        "id" => {
                            let keys = keys.into_iter().chain(Some(key.0));
                            let a = MapAccessChain::new(keys, vals, a);
                            let de = MapAccessDeserializer::new(a);
                            return Tweet::deserialize(de)
                                .map(Box::new)
                                .map(StreamMessage::Tweet);
                        },
                        "event" => {
                            let keys = keys.into_iter().chain(Some(key.0));
                            let a = MapAccessChain::new(keys, vals, a);
                            let de = MapAccessDeserializer::new(a);
                            return Event::deserialize(de)
                                .map(Box::new)
                                .map(StreamMessage::Event);
                        },
                        "for_user" => {
                            let id = a.next_value::<u64>()?;

                            if let Some((_, v)) = keys.iter().zip(vals)
                                .find(|&(k, _)| "message" == k)
                            {
                                let ret = StreamMessage::deserialize(v)
                                    .map(|m| {
                                        StreamMessage::ForUser(id, Box::new(m))
                                    })
                                    .map_err(A::Error::custom)?;
                                while a.next_entry::<IgnoredAny,IgnoredAny>()?
                                    .is_some()
                                {}
                                return Ok(ret);
                            }

                            while let Some(k) = a.next_key::<CowStr>()? {
                                if "message" == &*k {
                                    let ret = a.next_value()
                                        .map(|m| StreamMessage::ForUser(
                                            id,
                                            Box::new(m)
                                        ))?;
                                    while a.next_entry::<
                                        IgnoredAny,
                                        IgnoredAny,
                                    >()?.is_some()
                                    {}
                                    return Ok(ret);
                                }
                                a.next_value::<IgnoredAny>()?;
                            }

                            return Err(A::Error::missing_field("message"));
                        },
                        _ => {
                            keys.push(key.0);
                            vals.push(a.next_value()?);
                            key = if let Some(k) = a.next_key()? {
                                k
                            } else {
                                return Ok(StreamMessage::Custom(
                                    keys.into_iter()
                                        .map(Cow::into_owned)
                                        .zip(vals)
                                        .collect::<JsonMap<_,_>>()
                                ));
                            };
                        },
                    }
                }
            }

            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
                write!(f, "a map")
            }
        }

        deserializer.deserialize_map(SMVisitor)
    }
}

impl<'x> Deserialize<'x> for Delete {
    fn deserialize<D: Deserializer<'x>>(d: D) -> Result<Self, D::Error> {
        struct DeleteVisitor;

        impl<'x> Visitor<'x> for DeleteVisitor {
            type Value = Delete;

            fn visit_map<A: MapAccess<'x>>(self, mut a: A)
                -> Result<Delete, A::Error>
            {
                #[derive(Deserialize)]
                struct Status { id: StatusId, user_id: UserId };

                while let Some(k) = a.next_key::<CowStr>()? {
                    if "status" == &*k {
                        let Status { id, user_id } = a.next_value()?;
                        while a.next_entry::<IgnoredAny,IgnoredAny>()?.is_some()
                        {}
                        return Ok(Delete { id, user_id });
                    } else {
                        a.next_value::<IgnoredAny>()?;
                    }
                }

                Err(A::Error::missing_field("status"))
            }

            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
                write!(f, "a map with a field `status` which contains field \
                    `id` and `user_id` of integer type`")
            }
        }

        d.deserialize_map(DeleteVisitor)
    }
}

impl<'a> fmt::Display for Disconnect<'a> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{}: {} {}: {}",
            self.stream_name,
            self.code as u32,
            self.code.as_ref(),
            self.reason
        )
    }
}

#[cfg(test)]
mod tests {
    use json;
    use super::*;

    #[test]
    fn parse() {
        let json = include_str!("test_assets/tweet_1.json");
        json::from_str::<StreamMessage>(json).unwrap();
    }

    #[test]
    fn warning() {
        let json = include_str!("test_assets/falling_behind_1.json");
        let message = include_str!("test_assets/falling_behind_1_message.in")
            .into();
        assert_eq!(
            StreamMessage::Warning(Warning {
                message,
                code: WarningCode::FallingBehind(60),
            }),
            json::from_str(json).unwrap()
        )
    }
}