grpc/
resp.rs

1use futures::future;
2use futures::future::FutureExt;
3use futures::future::TryFutureExt;
4use futures::stream;
5use futures::stream::Stream;
6use futures::stream::StreamExt;
7
8use crate::error;
9
10use crate::futures_grpc::*;
11
12use crate::proto::metadata::Metadata;
13use crate::result;
14use crate::stream_item::*;
15use futures::task::Context;
16use std::future::Future;
17use std::pin::Pin;
18use std::task::Poll;
19
20/// Single message response
21pub struct SingleResponse<T: Send + 'static>(pub GrpcFuture<(Metadata, GrpcFuture<(T, Metadata)>)>);
22
23impl<T: Send + 'static> SingleResponse<T> {
24    // constructors
25
26    pub fn new<F>(f: F) -> SingleResponse<T>
27    where
28        F: Future<Output = crate::Result<(Metadata, GrpcFuture<(T, Metadata)>)>> + Send + 'static,
29    {
30        SingleResponse(Box::pin(f))
31    }
32
33    pub fn metadata_and_future<F>(metadata: Metadata, result: F) -> SingleResponse<T>
34    where
35        F: Future<Output = crate::Result<T>> + Send + 'static,
36    {
37        SingleResponse::metadata_and_future_and_trailing_metadata(
38            metadata,
39            result,
40            future::ok(Metadata::new()),
41        )
42    }
43
44    pub fn metadata_and_future_and_trailing_metadata<F, M>(
45        metadata: Metadata,
46        result: F,
47        trailing: M,
48    ) -> SingleResponse<T>
49    where
50        F: Future<Output = crate::Result<T>> + Send + 'static,
51        M: Future<Output = crate::Result<Metadata>> + Send + 'static,
52    {
53        let future: GrpcFuture<(T, Metadata)> = Box::pin(future::try_join(result, trailing));
54        SingleResponse::new(future::ok((metadata, future)))
55    }
56
57    pub fn completed_with_metadata_and_trailing_metadata(
58        metadata: Metadata,
59        r: T,
60        trailing: Metadata,
61    ) -> SingleResponse<T> {
62        SingleResponse::metadata_and_future_and_trailing_metadata(
63            metadata,
64            future::ok(r),
65            future::ok(trailing),
66        )
67    }
68
69    pub fn completed_with_metadata(metadata: Metadata, r: T) -> SingleResponse<T> {
70        SingleResponse::completed_with_metadata_and_trailing_metadata(metadata, r, Metadata::new())
71    }
72
73    pub fn completed(r: T) -> SingleResponse<T> {
74        SingleResponse::completed_with_metadata_and_trailing_metadata(
75            Metadata::new(),
76            r,
77            Metadata::new(),
78        )
79    }
80
81    pub fn no_metadata<F>(r: F) -> SingleResponse<T>
82    where
83        F: Future<Output = crate::Result<T>> + Send + 'static,
84    {
85        SingleResponse::metadata_and_future(Metadata::new(), r)
86    }
87
88    pub fn err(err: error::Error) -> SingleResponse<T> {
89        SingleResponse::new(future::err(err))
90    }
91
92    // getters
93
94    pub fn join_metadata_result(self) -> GrpcFuture<(Metadata, T, Metadata)> {
95        Box::pin(self.0.and_then(|(initial, result)| {
96            result.map_ok(|(result, trailing)| (initial, result, trailing))
97        }))
98    }
99
100    pub fn drop_metadata(self) -> GrpcFuture<T> {
101        Box::pin(
102            self.0
103                .and_then(|(_initial, result)| result.map_ok(|(result, _trailing)| result)),
104        )
105    }
106
107    /// Convert self into single element stream.
108    pub fn into_stream(self) -> StreamingResponse<T> {
109        StreamingResponse::new(self.0.map_ok(|(metadata, future)| {
110            let stream = future
111                .map_ok(|(result, trailing)| {
112                    stream::iter(vec![
113                        ItemOrMetadata::Item(result),
114                        ItemOrMetadata::TrailingMetadata(trailing),
115                    ])
116                    .map(Ok)
117                })
118                .try_flatten_stream();
119
120            (metadata, GrpcStreamWithTrailingMetadata::new(stream))
121        }))
122    }
123}
124
125impl<T: Send + 'static> Future for SingleResponse<T> {
126    type Output = crate::Result<(Metadata, GrpcFuture<(T, Metadata)>)>;
127
128    fn poll(
129        mut self: Pin<&mut Self>,
130        cx: &mut Context<'_>,
131    ) -> Poll<crate::Result<(Metadata, GrpcFuture<(T, Metadata)>)>> {
132        self.0.poll_unpin(cx)
133    }
134}
135
136/// Streaming response
137pub struct StreamingResponse<T: Send + 'static>(
138    /// Initial metadata, stream of items followed by trailing metadata
139    pub GrpcFuture<(Metadata, GrpcStreamWithTrailingMetadata<T>)>,
140);
141
142fn _assert_types() {
143    //    ::assert_types::assert_send::<StreamingResponse<String>>();
144    //    ::assert_types::assert_sync::<StreamingResponse<String>>();
145}
146
147impl<T: Send + 'static> StreamingResponse<T> {
148    // constructors
149
150    pub fn new<F>(f: F) -> StreamingResponse<T>
151    where
152        F: Future<Output = crate::Result<(Metadata, GrpcStreamWithTrailingMetadata<T>)>>
153            + Send
154            + 'static,
155    {
156        StreamingResponse(Box::pin(f))
157    }
158
159    pub fn metadata_and_stream_and_trailing_metadata<S, M>(
160        metadata: Metadata,
161        result: S,
162        trailing: M,
163    ) -> StreamingResponse<T>
164    where
165        S: Stream<Item = crate::Result<T>> + Send + 'static,
166        M: Future<Output = crate::Result<Metadata>> + Send + 'static,
167    {
168        let boxed = GrpcStreamWithTrailingMetadata::stream_with_trailing_metadata(result, trailing);
169        StreamingResponse::new(future::ok((metadata, boxed)))
170    }
171
172    pub fn metadata_and_stream<S>(metadata: Metadata, result: S) -> StreamingResponse<T>
173    where
174        S: Stream<Item = crate::Result<T>> + Send + 'static,
175    {
176        let boxed = GrpcStreamWithTrailingMetadata::stream(result);
177        StreamingResponse::new(future::ok((metadata, boxed)))
178    }
179
180    pub fn no_metadata<S>(s: S) -> StreamingResponse<T>
181    where
182        S: Stream<Item = crate::Result<T>> + Send + 'static,
183    {
184        StreamingResponse::metadata_and_stream_and_trailing_metadata(
185            Metadata::new(),
186            s,
187            future::ok(Metadata::new()),
188        )
189    }
190
191    pub fn completed_with_metadata_and_trailing_metadata(
192        metadata: Metadata,
193        r: Vec<T>,
194        trailing: Metadata,
195    ) -> StreamingResponse<T> {
196        StreamingResponse::iter_with_metadata_and_trailing_metadata(
197            metadata,
198            r.into_iter(),
199            future::ok(trailing),
200        )
201    }
202
203    pub fn completed_with_metadata(metadata: Metadata, r: Vec<T>) -> StreamingResponse<T> {
204        StreamingResponse::completed_with_metadata_and_trailing_metadata(
205            metadata,
206            r,
207            Metadata::new(),
208        )
209    }
210
211    pub fn iter_with_metadata_and_trailing_metadata<I, M>(
212        metadata: Metadata,
213        iter: I,
214        trailing: M,
215    ) -> StreamingResponse<T>
216    where
217        I: Iterator<Item = T> + Send + 'static,
218        M: Future<Output = crate::Result<Metadata>> + Send + 'static,
219    {
220        StreamingResponse::metadata_and_stream_and_trailing_metadata(
221            metadata,
222            stream::iter(iter).map(Ok),
223            trailing,
224        )
225    }
226
227    pub fn iter_with_metadata<I>(metadata: Metadata, iter: I) -> StreamingResponse<T>
228    where
229        I: Iterator<Item = T> + Send + 'static,
230    {
231        StreamingResponse::iter_with_metadata_and_trailing_metadata(
232            metadata,
233            iter,
234            future::ok(Metadata::new()),
235        )
236    }
237
238    pub fn completed(r: Vec<T>) -> StreamingResponse<T> {
239        StreamingResponse::completed_with_metadata_and_trailing_metadata(
240            Metadata::new(),
241            r,
242            Metadata::new(),
243        )
244    }
245
246    pub fn iter<I>(iter: I) -> StreamingResponse<T>
247    where
248        I: Iterator<Item = T> + Send + 'static,
249    {
250        StreamingResponse::iter_with_metadata_and_trailing_metadata(
251            Metadata::new(),
252            iter,
253            future::ok(Metadata::new()),
254        )
255    }
256
257    pub fn empty() -> StreamingResponse<T> {
258        StreamingResponse::no_metadata(stream::empty())
259    }
260
261    /// Create an error response
262    pub fn err(err: error::Error) -> StreamingResponse<T> {
263        StreamingResponse::new(future::err(err))
264    }
265
266    // getters
267
268    fn map_stream<U, F>(self, f: F) -> StreamingResponse<U>
269    where
270        U: Send + 'static,
271        F: FnOnce(GrpcStreamWithTrailingMetadata<T>) -> GrpcStreamWithTrailingMetadata<U>
272            + Send
273            + 'static,
274    {
275        StreamingResponse::new(
276            self.0
277                .map_ok(move |(metadata, stream)| (metadata, f(stream))),
278        )
279    }
280
281    pub fn map_items<U, F>(self, f: F) -> StreamingResponse<U>
282    where
283        U: Send + 'static,
284        F: FnMut(T) -> U + Send + 'static,
285    {
286        self.map_stream(move |stream| stream.map_items(f))
287    }
288
289    pub fn and_then_items<U, F>(self, f: F) -> StreamingResponse<U>
290    where
291        U: Send + 'static,
292        F: FnMut(T) -> result::Result<U> + Send + 'static,
293    {
294        self.map_stream(move |stream| stream.and_then_items(f))
295    }
296
297    pub fn drop_metadata(self) -> GrpcStream<T> {
298        Box::pin(
299            self.0
300                .map_ok(|(_metadata, stream)| stream.drop_metadata())
301                .try_flatten_stream(),
302        )
303    }
304
305    pub fn into_future(self) -> SingleResponse<Vec<T>> {
306        SingleResponse::new(self.0.map_ok(|(initial, stream)| {
307            let future: GrpcFuture<(Vec<T>, Metadata)> = stream.collect_with_metadata();
308            (initial, future)
309        }))
310    }
311
312    /// Take single element from stream
313    pub fn single(self) -> SingleResponse<T> {
314        SingleResponse(Box::pin(self.0.map_ok(|(metadata, stream)| {
315            (metadata, stream.single_with_metadata())
316        })))
317    }
318
319    pub fn collect(self) -> GrpcFuture<(Metadata, Vec<T>, Metadata)> {
320        self.into_future().join_metadata_result()
321    }
322}