1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
//! Contains helper dispatches that ease working with common protocols, like `Primitive` or
//! `Streaming`.

use futures::Future;
use futures::sync::mpsc::UnboundedSender;
use futures::sync::oneshot;

use serde::Deserialize;

use {Dispatch, Error, Response};
use protocol::{self, Flatten, Primitive};

/// A single-shot dispatch wraps the given oneshot sender and implements `Primitive` protocol
/// emitting either value or error.
///
/// The majority of services adheres such protocol.
#[derive(Debug)]
pub struct PrimitiveDispatch<T> {
    tx: oneshot::Sender<Result<T, Error>>,
}

impl<T> PrimitiveDispatch<T> {
    /// Constructs a `PrimitiveDispatch` by wrapping the specified oneshot sender.
    pub fn new(tx: oneshot::Sender<Result<T, Error>>) -> Self {
        Self { tx: tx }
    }

    /// Constructs a `PrimitiveDispatch` paired with a future of result.
    ///
    /// The future returned will be resolved at a time when an incoming message is consumed by the
    /// dispatch.
    pub fn pair() -> (Self, impl Future<Item = T, Error = Error>) {
        let (tx, rx) = oneshot::channel();
        let result = PrimitiveDispatch::new(tx);
        let future = rx.map_err(|e| e.into()).then(|r| r.and_then(|v| v));

        (result, future)
    }
}

impl<T: for<'de> Deserialize<'de> + Send> Dispatch for PrimitiveDispatch<T> {
    fn process(self: Box<Self>, response: &Response) -> Option<Box<Dispatch>> {
        let result = response.deserialize::<Primitive<T>>()
            .flatten();
        drop(self.tx.send(result));

        None
    }

    fn discard(self: Box<Self>, err: &Error) {
        drop(self.tx.send(Err(err.clone())));
    }
}

/// A streaming dispatch wraps the given stream and implements `Streaming` protocol, emitting
/// either chunk, error or close events as usual stream events.
#[derive(Debug)]
pub struct StreamingDispatch<T> {
    tx: UnboundedSender<Result<T, Error>>,
}

impl<T> StreamingDispatch<T> {
    /// Constructs a `StreamingDispatch` by wrapping the specified sender.
    pub fn new(tx: UnboundedSender<Result<T, Error>>) -> Self {
        Self { tx: tx }
    }

    fn send(self: Box<Self>, result: Result<T, Error>) -> Option<Box<Dispatch>>
        where T: for<'de> Deserialize<'de> + Send + 'static
    {
        if self.tx.unbounded_send(result).is_ok() {
            Some(self)
        } else {
            None
        }
    }
}

impl<T: for<'de> Deserialize<'de> + Send + 'static> Dispatch for StreamingDispatch<T> {
    fn process(self: Box<Self>, response: &Response) -> Option<Box<Dispatch>> {
        match response.deserialize::<protocol::Streaming<T>>().flatten() {
            Ok(Some(data)) => self.send(Ok(data)),
            Ok(None) => None,
            Err(err) => self.send(Err(err)),
        }
    }

    fn discard(self: Box<Self>, err: &Error) {
        drop(self.tx.unbounded_send(Err(err.clone())))
    }
}