1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
//! `Stream<Item = Request>` + `Service<Request>` => `Stream<Item = Response>`.
use ;
use ;
use Service;
/// This is a `futures::Stream` of responses resulting from calling the wrapped `tower::Service`
/// for each request received on the wrapped `Stream`.
///
/// ```rust
/// # extern crate futures;
/// # extern crate tower_service;
/// # extern crate tokio_mock_task;
/// # extern crate tower;
/// # use futures::future::{ok, FutureResult};
/// # use futures::{Async, Poll};
/// # use std::cell::Cell;
/// # use std::error::Error;
/// # use std::rc::Rc;
/// #
/// use futures::Stream;
/// use tower_service::Service;
/// use tower::ServiceExt;
///
/// // First, we need to have a Service to process our requests.
/// #[derive(Debug, Eq, PartialEq)]
/// struct FirstLetter;
/// impl Service<&'static str> for FirstLetter {
/// type Response = &'static str;
/// type Error = Box<Error + Send + Sync>;
/// type Future = FutureResult<Self::Response, Self::Error>;
///
/// fn poll_ready(&mut self) -> Poll<(), Self::Error> {
/// Ok(Async::Ready(()))
/// }
///
/// fn call(&mut self, req: &'static str) -> Self::Future {
/// ok(&req[..1])
/// }
/// }
///
/// # fn main() {
/// # let mut mock = tokio_mock_task::MockTask::new();
/// // Next, we need a Stream of requests.
/// let (reqs, rx) = futures::unsync::mpsc::unbounded();
/// // Note that we have to help Rust out here by telling it what error type to use.
/// // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
/// let rsps = FirstLetter.call_all(rx.map_err(|_| "boom"));
///
/// // Now, let's send a few requests and then check that we get the corresponding responses.
/// reqs.unbounded_send("one");
/// reqs.unbounded_send("two");
/// reqs.unbounded_send("three");
/// drop(reqs);
///
/// // We then loop over the response Strem that we get back from call_all.
/// # // a little bit of trickery here since we don't have an executor
/// # /*
/// let mut iter = rsps.wait();
/// # */
/// # let mut iter = mock.enter(|| rsps.wait());
/// # for (i, rsp) in (&mut iter).enumerate() {
/// // Since we used .wait(), each response is a Result.
/// match (i + 1, rsp.unwrap()) {
/// (1, "o") |
/// (2, "t") |
/// (3, "t") => {}
/// (n, i) => {
/// unreachable!("{}. response was '{}'", n, i);
/// }
/// }
/// }
///
/// // And at the end, we can get the Service back when there are no more requests.
/// let rsps = iter.into_inner();
/// assert_eq!(rsps.into_inner(), FirstLetter);
/// # }
/// ```