twitter_stream_message/message/
mod.rs

1//! Messages from Streaming API.
2
3mod event;
4mod warning;
5
6pub use self::event::{Event, EventKind};
7pub use self::warning::{Warning, WarningCode};
8
9use std::borrow::Cow;
10use std::fmt;
11
12use serde::de::{
13    Deserialize,
14    Deserializer,
15    Error as SerdeError,
16    IgnoredAny,
17    MapAccess,
18    Unexpected,
19    Visitor,
20};
21use serde::de::value::MapAccessDeserializer;
22
23use DirectMessage;
24use tweet::{StatusId, Tweet};
25use types::{JsonMap, JsonValue};
26use user::UserId;
27use util::{CowStr, MapAccessChain};
28
29/// Represents a message from Twitter Streaming API.
30///
31/// # Reference
32///
33/// 1. [Streaming message types — Twitter Developers](https://dev.twitter.com/streaming/overview/messages-types)
34#[derive(Clone, Debug, PartialEq)]
35pub enum StreamMessage<'a> {
36    /// Tweet
37    Tweet(Box<Tweet<'a>>),
38
39    /// Notifications about non-Tweet events.
40    Event(Box<Event<'a>>),
41
42    /// Indicate that a given Tweet has been deleted.
43    Delete(Delete),
44
45    /// Indicate that geolocated data must be stripped from a range of Tweets.
46    ScrubGeo(ScrubGeo),
47
48    /// Indicate that a filtered stream has matched more Tweets than
49    /// its current rate limit allows to be delivered, noticing a total count of
50    /// the number of undelivered Tweets since the connection was opened.
51    Limit(Limit),
52
53    /// Indicate that a given tweet has had its content withheld.
54    StatusWithheld(StatusWithheld<'a>),
55
56    /// Indicate that a user has had their content withheld.
57    UserWithheld(UserWithheld<'a>),
58
59    /// This message is sent when a stream is disconnected,
60    /// indicating why the stream was closed.
61    Disconnect(Disconnect<'a>),
62
63    /// Variout warning message
64    Warning(Warning<'a>),
65
66    /// List of the user's friends.
67    /// Only be sent upon establishing a User Stream connection.
68    Friends(Friends),
69
70    // TODO: deserialize `friends_str` into `Friends`
71    // FriendsStr(Vec<String>),
72
73    /// Direct message
74    DirectMessage(Box<DirectMessage<'a>>),
75
76    /// A [control URI][1] for Site Streams.
77    /// [1]: https://dev.twitter.com/streaming/sitestreams/controlstreams
78    Control(Control<'a>),
79
80    /// An [envelope][1] for Site Stream.
81    /// [1]: https://dev.twitter.com/streaming/overview/messages-types#envelopes_for_user
82    ForUser(UserId, Box<StreamMessage<'a>>),
83
84    // ForUserStr(String, Box<StreamMessage>),
85
86    /// A message not known to this library.
87    Custom(JsonMap<String, JsonValue>),
88}
89
90/// Represents a deleted Tweet.
91#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
92pub struct Delete {
93    pub id: StatusId,
94    pub user_id: UserId,
95}
96
97/// Represents a range of Tweets whose geolocated data must be stripped.
98#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Hash)]
99pub struct ScrubGeo {
100    pub user_id: UserId,
101    pub up_to_status_id: StatusId,
102}
103
104#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Hash)]
105pub struct Limit {
106    pub track: u64,
107}
108
109#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash)]
110pub struct StatusWithheld<'a> {
111    pub id: StatusId,
112    pub user_id: UserId,
113    #[serde(borrow)]
114    #[serde(deserialize_with = "::util::deserialize_vec_cow_str")]
115    pub withheld_in_countries: Vec<Cow<'a, str>>,
116}
117
118#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash)]
119pub struct UserWithheld<'a> {
120    pub id: UserId,
121    #[serde(borrow)]
122    #[serde(deserialize_with = "::util::deserialize_vec_cow_str")]
123    pub withheld_in_countries: Vec<Cow<'a, str>>,
124}
125
126/// Indicates why a stream was closed.
127#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash)]
128pub struct Disconnect<'a> {
129    pub code: DisconnectCode,
130
131    #[serde(borrow)]
132    pub stream_name: Cow<'a, str>,
133
134    #[serde(borrow)]
135    pub reason: Cow<'a, str>,
136}
137
138macro_rules! number_enum {
139    (
140        $(#[$attr:meta])*
141        pub enum $E:ident {
142            $(
143                $(#[$v_attr:meta])*
144                $V:ident = $n:expr,
145            )*
146        }
147    ) => {
148        $(#[$attr])*
149        pub enum $E {
150            $(
151                $(#[$v_attr])*
152                $V = $n,
153            )*
154        }
155
156        impl<'x> Deserialize<'x> for $E {
157            fn deserialize<D: Deserializer<'x>>(d: D)
158                -> Result<Self, D::Error>
159            {
160                struct NEVisitor;
161
162                impl<'x> Visitor<'x> for NEVisitor {
163                    type Value = $E;
164
165                    fn visit_u64<E: SerdeError>(self, v: u64) -> Result<$E, E> {
166                        match v {
167                            $($n => Ok($E::$V),)*
168                            _ => Err(
169                                E::invalid_value(Unexpected::Unsigned(v), &self)
170                            ),
171                        }
172                    }
173
174                    fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
175                        write!(f, concat!(
176                            "one of the following integers: ", $($n, ','),*)
177                        )
178                    }
179                }
180
181                d.deserialize_u64(NEVisitor)
182            }
183        }
184
185        impl AsRef<str> for $E {
186            fn as_ref(&self) -> &str {
187                match *self {
188                    $($E::$V => stringify!($V),)*
189                }
190            }
191        }
192    };
193}
194
195number_enum! {
196    /// Status code for a `Disconnect` message.
197    #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
198    pub enum DisconnectCode {
199        /// The feed was shutdown (possibly a machine restart).
200        Shutdown = 1,
201        /// The same endpoint was connected too many times.
202        DuplicateStream = 2,
203        /// Control streams was used to close a stream (applies to sitestreams).
204        ControlRequest = 3,
205        /// The client was reading too slowly and was disconnected by the server.
206        Stall = 4,
207        /// The client appeared to have initiated a disconnect.
208        Normal = 5,
209        /// An oauth token was revoked for a user
210        /// (applies to site and userstreams).
211        TokenRevoked = 6,
212        /// The same credentials were used to connect a new stream
213        /// and the oldest was disconnected.
214        AdminLogout = 7,
215        // Reserved for internal use. Will not be delivered to external clients.
216        // _ = 8,
217        /// The stream connected with a negative count parameter
218        /// and was disconnected after all backfill was delivered.
219        MaxMessageLimit = 9,
220        /// An internal issue disconnected the stream.
221        StreamException = 10,
222        /// An internal issue disconnected the stream.
223        BrokerStall = 11,
224        /// The host the stream was connected to became overloaded
225        /// and streams were disconnected to balance load. Reconnect as usual.
226        ShedLoad = 12,
227    }
228}
229
230/// Represents a control message.
231#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Hash)]
232pub struct Control<'a> {
233    #[serde(borrow)]
234    control_uri: Cow<'a, str>,
235}
236
237pub type Friends = Vec<UserId>;
238
239impl<'a> StreamMessage<'a> {
240    /// Parse a JSON string returned from Twitter Streaming API.
241    ///
242    /// Note that this method is not a member of the `FromStr` trait. It is
243    /// because the method requires the lifetime information of the JSON string,
244    /// while `FromStr::from_str` does not take a lifetime parameter.
245    ///
246    /// ```
247    /// use twitter_stream_message::message::{Delete, StreamMessage};
248    ///
249    /// let parsed = StreamMessage::from_str(r#"{
250    ///     "delete":{
251    ///         "status":{
252    ///             "id":1234,
253    ///             "id_str":"1234",
254    ///             "user_id":3,
255    ///             "user_id_str":"3"
256    ///         }
257    ///     }
258    /// }"#).unwrap();
259    /// let expected = StreamMessage::Delete(Delete {
260    ///     id: 1234,
261    ///     user_id: 3,
262    /// });
263    ///
264    /// assert_eq!(parsed, expected);
265    #[cfg_attr(feature = "cargo-clippy", allow(should_implement_trait))]
266    pub fn from_str(json: &'a str) -> ::Result<Self> {
267        ::json::from_str(json)
268    }
269}
270
271impl<'de: 'a, 'a> Deserialize<'de> for StreamMessage<'a> {
272    fn deserialize<D: Deserializer<'de>>(deserializer: D)
273        -> Result<Self, D::Error>
274    {
275        struct SMVisitor;
276
277        impl<'a> Visitor<'a> for SMVisitor {
278            type Value = StreamMessage<'a>;
279
280            fn visit_map<A: MapAccess<'a>>(self, mut a: A)
281                -> Result<StreamMessage<'a>, A::Error>
282            {
283                let mut key = match a.next_key::<CowStr>()? {
284                    Some(k) => k,
285                    None => return Ok(StreamMessage::Custom(JsonMap::new())),
286                };
287
288                let ret = match &*key {
289                    "delete" => Some(
290                        a.next_value().map(StreamMessage::Delete)
291                    ),
292                    "scrub_geo" => Some(
293                        a.next_value().map(StreamMessage::ScrubGeo)
294                    ),
295                    "limit" => Some(
296                        a.next_value().map(StreamMessage::Limit)
297                    ),
298                    "status_withheld" => Some(
299                        a.next_value().map(StreamMessage::StatusWithheld)
300                    ),
301                    "user_withheld" => Some(
302                        a.next_value().map(StreamMessage::UserWithheld)
303                    ),
304                    "disconnect" => Some(
305                        a.next_value().map(StreamMessage::Disconnect)
306                    ),
307                    "warning" => Some(
308                        a.next_value().map(StreamMessage::Warning)
309                    ),
310                    "friends" => Some(
311                        a.next_value().map(StreamMessage::Friends)
312                    ),
313                    // "friends_str" => Some(
314                    //     a.next_value().map(StreamMessage::Friends)
315                    // ),
316                    "direct_message" => Some(
317                        a.next_value().map(StreamMessage::DirectMessage)
318                    ),
319                    "control" => Some(
320                        a.next_value().map(StreamMessage::Control)
321                    ),
322                    _ => None,
323                };
324
325                if let Some(ret) = ret {
326                    if ret.is_ok() {
327                        while a.next_entry::<IgnoredAny,IgnoredAny>()?.is_some()
328                        {}
329                    }
330                    return ret;
331                }
332
333                // Tweet, Event or for_user envelope:
334
335                let mut keys = Vec::new();
336                let mut vals = Vec::new();
337
338                loop {
339                    match &*key {
340                        "id" => {
341                            let keys = keys.into_iter().chain(Some(key.0));
342                            let a = MapAccessChain::new(keys, vals, a);
343                            let de = MapAccessDeserializer::new(a);
344                            return Tweet::deserialize(de)
345                                .map(Box::new)
346                                .map(StreamMessage::Tweet);
347                        },
348                        "event" => {
349                            let keys = keys.into_iter().chain(Some(key.0));
350                            let a = MapAccessChain::new(keys, vals, a);
351                            let de = MapAccessDeserializer::new(a);
352                            return Event::deserialize(de)
353                                .map(Box::new)
354                                .map(StreamMessage::Event);
355                        },
356                        "for_user" => {
357                            let id = a.next_value::<u64>()?;
358
359                            if let Some((_, v)) = keys.iter().zip(vals)
360                                .find(|&(k, _)| "message" == k)
361                            {
362                                let ret = StreamMessage::deserialize(v)
363                                    .map(|m| {
364                                        StreamMessage::ForUser(id, Box::new(m))
365                                    })
366                                    .map_err(A::Error::custom)?;
367                                while a.next_entry::<IgnoredAny,IgnoredAny>()?
368                                    .is_some()
369                                {}
370                                return Ok(ret);
371                            }
372
373                            while let Some(k) = a.next_key::<CowStr>()? {
374                                if "message" == &*k {
375                                    let ret = a.next_value()
376                                        .map(|m| StreamMessage::ForUser(
377                                            id,
378                                            Box::new(m)
379                                        ))?;
380                                    while a.next_entry::<
381                                        IgnoredAny,
382                                        IgnoredAny,
383                                    >()?.is_some()
384                                    {}
385                                    return Ok(ret);
386                                }
387                                a.next_value::<IgnoredAny>()?;
388                            }
389
390                            return Err(A::Error::missing_field("message"));
391                        },
392                        _ => {
393                            keys.push(key.0);
394                            vals.push(a.next_value()?);
395                            key = if let Some(k) = a.next_key()? {
396                                k
397                            } else {
398                                return Ok(StreamMessage::Custom(
399                                    keys.into_iter()
400                                        .map(Cow::into_owned)
401                                        .zip(vals)
402                                        .collect::<JsonMap<_,_>>()
403                                ));
404                            };
405                        },
406                    }
407                }
408            }
409
410            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
411                write!(f, "a map")
412            }
413        }
414
415        deserializer.deserialize_map(SMVisitor)
416    }
417}
418
419impl<'x> Deserialize<'x> for Delete {
420    fn deserialize<D: Deserializer<'x>>(d: D) -> Result<Self, D::Error> {
421        struct DeleteVisitor;
422
423        impl<'x> Visitor<'x> for DeleteVisitor {
424            type Value = Delete;
425
426            fn visit_map<A: MapAccess<'x>>(self, mut a: A)
427                -> Result<Delete, A::Error>
428            {
429                #[derive(Deserialize)]
430                struct Status { id: StatusId, user_id: UserId };
431
432                while let Some(k) = a.next_key::<CowStr>()? {
433                    if "status" == &*k {
434                        let Status { id, user_id } = a.next_value()?;
435                        while a.next_entry::<IgnoredAny,IgnoredAny>()?.is_some()
436                        {}
437                        return Ok(Delete { id, user_id });
438                    } else {
439                        a.next_value::<IgnoredAny>()?;
440                    }
441                }
442
443                Err(A::Error::missing_field("status"))
444            }
445
446            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
447                write!(f, "a map with a field `status` which contains field \
448                    `id` and `user_id` of integer type`")
449            }
450        }
451
452        d.deserialize_map(DeleteVisitor)
453    }
454}
455
456impl<'a> fmt::Display for Disconnect<'a> {
457    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
458        write!(f, "{}: {} {}: {}",
459            self.stream_name,
460            self.code as u32,
461            self.code.as_ref(),
462            self.reason
463        )
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use json;
470    use super::*;
471
472    #[test]
473    fn parse() {
474        let json = include_str!("test_assets/tweet_1.json");
475        json::from_str::<StreamMessage>(json).unwrap();
476    }
477
478    #[test]
479    fn warning() {
480        let json = include_str!("test_assets/falling_behind_1.json");
481        let message = include_str!("test_assets/falling_behind_1_message.in")
482            .into();
483        assert_eq!(
484            StreamMessage::Warning(Warning {
485                message,
486                code: WarningCode::FallingBehind(60),
487            }),
488            json::from_str(json).unwrap()
489        )
490    }
491}