1mod event;
4mod warning;
5
6pub use self::event::{Event, EventKind};
7pub use self::warning::{Warning, WarningCode};
8
9use std::borrow::Cow;
10use std::fmt;
11
12use serde::de::{
13 Deserialize,
14 Deserializer,
15 Error as SerdeError,
16 IgnoredAny,
17 MapAccess,
18 Unexpected,
19 Visitor,
20};
21use serde::de::value::MapAccessDeserializer;
22
23use DirectMessage;
24use tweet::{StatusId, Tweet};
25use types::{JsonMap, JsonValue};
26use user::UserId;
27use util::{CowStr, MapAccessChain};
28
29#[derive(Clone, Debug, PartialEq)]
35pub enum StreamMessage<'a> {
36 Tweet(Box<Tweet<'a>>),
38
39 Event(Box<Event<'a>>),
41
42 Delete(Delete),
44
45 ScrubGeo(ScrubGeo),
47
48 Limit(Limit),
52
53 StatusWithheld(StatusWithheld<'a>),
55
56 UserWithheld(UserWithheld<'a>),
58
59 Disconnect(Disconnect<'a>),
62
63 Warning(Warning<'a>),
65
66 Friends(Friends),
69
70 DirectMessage(Box<DirectMessage<'a>>),
75
76 Control(Control<'a>),
79
80 ForUser(UserId, Box<StreamMessage<'a>>),
83
84 Custom(JsonMap<String, JsonValue>),
88}
89
90#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
92pub struct Delete {
93 pub id: StatusId,
94 pub user_id: UserId,
95}
96
97#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Hash)]
99pub struct ScrubGeo {
100 pub user_id: UserId,
101 pub up_to_status_id: StatusId,
102}
103
104#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Hash)]
105pub struct Limit {
106 pub track: u64,
107}
108
109#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash)]
110pub struct StatusWithheld<'a> {
111 pub id: StatusId,
112 pub user_id: UserId,
113 #[serde(borrow)]
114 #[serde(deserialize_with = "::util::deserialize_vec_cow_str")]
115 pub withheld_in_countries: Vec<Cow<'a, str>>,
116}
117
118#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash)]
119pub struct UserWithheld<'a> {
120 pub id: UserId,
121 #[serde(borrow)]
122 #[serde(deserialize_with = "::util::deserialize_vec_cow_str")]
123 pub withheld_in_countries: Vec<Cow<'a, str>>,
124}
125
126#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash)]
128pub struct Disconnect<'a> {
129 pub code: DisconnectCode,
130
131 #[serde(borrow)]
132 pub stream_name: Cow<'a, str>,
133
134 #[serde(borrow)]
135 pub reason: Cow<'a, str>,
136}
137
138macro_rules! number_enum {
139 (
140 $(#[$attr:meta])*
141 pub enum $E:ident {
142 $(
143 $(#[$v_attr:meta])*
144 $V:ident = $n:expr,
145 )*
146 }
147 ) => {
148 $(#[$attr])*
149 pub enum $E {
150 $(
151 $(#[$v_attr])*
152 $V = $n,
153 )*
154 }
155
156 impl<'x> Deserialize<'x> for $E {
157 fn deserialize<D: Deserializer<'x>>(d: D)
158 -> Result<Self, D::Error>
159 {
160 struct NEVisitor;
161
162 impl<'x> Visitor<'x> for NEVisitor {
163 type Value = $E;
164
165 fn visit_u64<E: SerdeError>(self, v: u64) -> Result<$E, E> {
166 match v {
167 $($n => Ok($E::$V),)*
168 _ => Err(
169 E::invalid_value(Unexpected::Unsigned(v), &self)
170 ),
171 }
172 }
173
174 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
175 write!(f, concat!(
176 "one of the following integers: ", $($n, ','),*)
177 )
178 }
179 }
180
181 d.deserialize_u64(NEVisitor)
182 }
183 }
184
185 impl AsRef<str> for $E {
186 fn as_ref(&self) -> &str {
187 match *self {
188 $($E::$V => stringify!($V),)*
189 }
190 }
191 }
192 };
193}
194
195number_enum! {
196 #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
198 pub enum DisconnectCode {
199 Shutdown = 1,
201 DuplicateStream = 2,
203 ControlRequest = 3,
205 Stall = 4,
207 Normal = 5,
209 TokenRevoked = 6,
212 AdminLogout = 7,
215 MaxMessageLimit = 9,
220 StreamException = 10,
222 BrokerStall = 11,
224 ShedLoad = 12,
227 }
228}
229
230#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Hash)]
232pub struct Control<'a> {
233 #[serde(borrow)]
234 control_uri: Cow<'a, str>,
235}
236
237pub type Friends = Vec<UserId>;
238
239impl<'a> StreamMessage<'a> {
240 #[cfg_attr(feature = "cargo-clippy", allow(should_implement_trait))]
266 pub fn from_str(json: &'a str) -> ::Result<Self> {
267 ::json::from_str(json)
268 }
269}
270
271impl<'de: 'a, 'a> Deserialize<'de> for StreamMessage<'a> {
272 fn deserialize<D: Deserializer<'de>>(deserializer: D)
273 -> Result<Self, D::Error>
274 {
275 struct SMVisitor;
276
277 impl<'a> Visitor<'a> for SMVisitor {
278 type Value = StreamMessage<'a>;
279
280 fn visit_map<A: MapAccess<'a>>(self, mut a: A)
281 -> Result<StreamMessage<'a>, A::Error>
282 {
283 let mut key = match a.next_key::<CowStr>()? {
284 Some(k) => k,
285 None => return Ok(StreamMessage::Custom(JsonMap::new())),
286 };
287
288 let ret = match &*key {
289 "delete" => Some(
290 a.next_value().map(StreamMessage::Delete)
291 ),
292 "scrub_geo" => Some(
293 a.next_value().map(StreamMessage::ScrubGeo)
294 ),
295 "limit" => Some(
296 a.next_value().map(StreamMessage::Limit)
297 ),
298 "status_withheld" => Some(
299 a.next_value().map(StreamMessage::StatusWithheld)
300 ),
301 "user_withheld" => Some(
302 a.next_value().map(StreamMessage::UserWithheld)
303 ),
304 "disconnect" => Some(
305 a.next_value().map(StreamMessage::Disconnect)
306 ),
307 "warning" => Some(
308 a.next_value().map(StreamMessage::Warning)
309 ),
310 "friends" => Some(
311 a.next_value().map(StreamMessage::Friends)
312 ),
313 "direct_message" => Some(
317 a.next_value().map(StreamMessage::DirectMessage)
318 ),
319 "control" => Some(
320 a.next_value().map(StreamMessage::Control)
321 ),
322 _ => None,
323 };
324
325 if let Some(ret) = ret {
326 if ret.is_ok() {
327 while a.next_entry::<IgnoredAny,IgnoredAny>()?.is_some()
328 {}
329 }
330 return ret;
331 }
332
333 let mut keys = Vec::new();
336 let mut vals = Vec::new();
337
338 loop {
339 match &*key {
340 "id" => {
341 let keys = keys.into_iter().chain(Some(key.0));
342 let a = MapAccessChain::new(keys, vals, a);
343 let de = MapAccessDeserializer::new(a);
344 return Tweet::deserialize(de)
345 .map(Box::new)
346 .map(StreamMessage::Tweet);
347 },
348 "event" => {
349 let keys = keys.into_iter().chain(Some(key.0));
350 let a = MapAccessChain::new(keys, vals, a);
351 let de = MapAccessDeserializer::new(a);
352 return Event::deserialize(de)
353 .map(Box::new)
354 .map(StreamMessage::Event);
355 },
356 "for_user" => {
357 let id = a.next_value::<u64>()?;
358
359 if let Some((_, v)) = keys.iter().zip(vals)
360 .find(|&(k, _)| "message" == k)
361 {
362 let ret = StreamMessage::deserialize(v)
363 .map(|m| {
364 StreamMessage::ForUser(id, Box::new(m))
365 })
366 .map_err(A::Error::custom)?;
367 while a.next_entry::<IgnoredAny,IgnoredAny>()?
368 .is_some()
369 {}
370 return Ok(ret);
371 }
372
373 while let Some(k) = a.next_key::<CowStr>()? {
374 if "message" == &*k {
375 let ret = a.next_value()
376 .map(|m| StreamMessage::ForUser(
377 id,
378 Box::new(m)
379 ))?;
380 while a.next_entry::<
381 IgnoredAny,
382 IgnoredAny,
383 >()?.is_some()
384 {}
385 return Ok(ret);
386 }
387 a.next_value::<IgnoredAny>()?;
388 }
389
390 return Err(A::Error::missing_field("message"));
391 },
392 _ => {
393 keys.push(key.0);
394 vals.push(a.next_value()?);
395 key = if let Some(k) = a.next_key()? {
396 k
397 } else {
398 return Ok(StreamMessage::Custom(
399 keys.into_iter()
400 .map(Cow::into_owned)
401 .zip(vals)
402 .collect::<JsonMap<_,_>>()
403 ));
404 };
405 },
406 }
407 }
408 }
409
410 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
411 write!(f, "a map")
412 }
413 }
414
415 deserializer.deserialize_map(SMVisitor)
416 }
417}
418
419impl<'x> Deserialize<'x> for Delete {
420 fn deserialize<D: Deserializer<'x>>(d: D) -> Result<Self, D::Error> {
421 struct DeleteVisitor;
422
423 impl<'x> Visitor<'x> for DeleteVisitor {
424 type Value = Delete;
425
426 fn visit_map<A: MapAccess<'x>>(self, mut a: A)
427 -> Result<Delete, A::Error>
428 {
429 #[derive(Deserialize)]
430 struct Status { id: StatusId, user_id: UserId };
431
432 while let Some(k) = a.next_key::<CowStr>()? {
433 if "status" == &*k {
434 let Status { id, user_id } = a.next_value()?;
435 while a.next_entry::<IgnoredAny,IgnoredAny>()?.is_some()
436 {}
437 return Ok(Delete { id, user_id });
438 } else {
439 a.next_value::<IgnoredAny>()?;
440 }
441 }
442
443 Err(A::Error::missing_field("status"))
444 }
445
446 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
447 write!(f, "a map with a field `status` which contains field \
448 `id` and `user_id` of integer type`")
449 }
450 }
451
452 d.deserialize_map(DeleteVisitor)
453 }
454}
455
456impl<'a> fmt::Display for Disconnect<'a> {
457 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
458 write!(f, "{}: {} {}: {}",
459 self.stream_name,
460 self.code as u32,
461 self.code.as_ref(),
462 self.reason
463 )
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use json;
470 use super::*;
471
472 #[test]
473 fn parse() {
474 let json = include_str!("test_assets/tweet_1.json");
475 json::from_str::<StreamMessage>(json).unwrap();
476 }
477
478 #[test]
479 fn warning() {
480 let json = include_str!("test_assets/falling_behind_1.json");
481 let message = include_str!("test_assets/falling_behind_1_message.in")
482 .into();
483 assert_eq!(
484 StreamMessage::Warning(Warning {
485 message,
486 code: WarningCode::FallingBehind(60),
487 }),
488 json::from_str(json).unwrap()
489 )
490 }
491}