1use futures::future;
2use futures::future::FutureExt;
3use futures::future::TryFutureExt;
4use futures::stream;
5use futures::stream::Stream;
6use futures::stream::StreamExt;
7
8use crate::error;
9
10use crate::futures_grpc::*;
11
12use crate::proto::metadata::Metadata;
13use crate::result;
14use crate::stream_item::*;
15use futures::task::Context;
16use std::future::Future;
17use std::pin::Pin;
18use std::task::Poll;
19
20pub struct SingleResponse<T: Send + 'static>(pub GrpcFuture<(Metadata, GrpcFuture<(T, Metadata)>)>);
22
23impl<T: Send + 'static> SingleResponse<T> {
24 pub fn new<F>(f: F) -> SingleResponse<T>
27 where
28 F: Future<Output = crate::Result<(Metadata, GrpcFuture<(T, Metadata)>)>> + Send + 'static,
29 {
30 SingleResponse(Box::pin(f))
31 }
32
33 pub fn metadata_and_future<F>(metadata: Metadata, result: F) -> SingleResponse<T>
34 where
35 F: Future<Output = crate::Result<T>> + Send + 'static,
36 {
37 SingleResponse::metadata_and_future_and_trailing_metadata(
38 metadata,
39 result,
40 future::ok(Metadata::new()),
41 )
42 }
43
44 pub fn metadata_and_future_and_trailing_metadata<F, M>(
45 metadata: Metadata,
46 result: F,
47 trailing: M,
48 ) -> SingleResponse<T>
49 where
50 F: Future<Output = crate::Result<T>> + Send + 'static,
51 M: Future<Output = crate::Result<Metadata>> + Send + 'static,
52 {
53 let future: GrpcFuture<(T, Metadata)> = Box::pin(future::try_join(result, trailing));
54 SingleResponse::new(future::ok((metadata, future)))
55 }
56
57 pub fn completed_with_metadata_and_trailing_metadata(
58 metadata: Metadata,
59 r: T,
60 trailing: Metadata,
61 ) -> SingleResponse<T> {
62 SingleResponse::metadata_and_future_and_trailing_metadata(
63 metadata,
64 future::ok(r),
65 future::ok(trailing),
66 )
67 }
68
69 pub fn completed_with_metadata(metadata: Metadata, r: T) -> SingleResponse<T> {
70 SingleResponse::completed_with_metadata_and_trailing_metadata(metadata, r, Metadata::new())
71 }
72
73 pub fn completed(r: T) -> SingleResponse<T> {
74 SingleResponse::completed_with_metadata_and_trailing_metadata(
75 Metadata::new(),
76 r,
77 Metadata::new(),
78 )
79 }
80
81 pub fn no_metadata<F>(r: F) -> SingleResponse<T>
82 where
83 F: Future<Output = crate::Result<T>> + Send + 'static,
84 {
85 SingleResponse::metadata_and_future(Metadata::new(), r)
86 }
87
88 pub fn err(err: error::Error) -> SingleResponse<T> {
89 SingleResponse::new(future::err(err))
90 }
91
92 pub fn join_metadata_result(self) -> GrpcFuture<(Metadata, T, Metadata)> {
95 Box::pin(self.0.and_then(|(initial, result)| {
96 result.map_ok(|(result, trailing)| (initial, result, trailing))
97 }))
98 }
99
100 pub fn drop_metadata(self) -> GrpcFuture<T> {
101 Box::pin(
102 self.0
103 .and_then(|(_initial, result)| result.map_ok(|(result, _trailing)| result)),
104 )
105 }
106
107 pub fn into_stream(self) -> StreamingResponse<T> {
109 StreamingResponse::new(self.0.map_ok(|(metadata, future)| {
110 let stream = future
111 .map_ok(|(result, trailing)| {
112 stream::iter(vec![
113 ItemOrMetadata::Item(result),
114 ItemOrMetadata::TrailingMetadata(trailing),
115 ])
116 .map(Ok)
117 })
118 .try_flatten_stream();
119
120 (metadata, GrpcStreamWithTrailingMetadata::new(stream))
121 }))
122 }
123}
124
125impl<T: Send + 'static> Future for SingleResponse<T> {
126 type Output = crate::Result<(Metadata, GrpcFuture<(T, Metadata)>)>;
127
128 fn poll(
129 mut self: Pin<&mut Self>,
130 cx: &mut Context<'_>,
131 ) -> Poll<crate::Result<(Metadata, GrpcFuture<(T, Metadata)>)>> {
132 self.0.poll_unpin(cx)
133 }
134}
135
136pub struct StreamingResponse<T: Send + 'static>(
138 pub GrpcFuture<(Metadata, GrpcStreamWithTrailingMetadata<T>)>,
140);
141
142fn _assert_types() {
143 }
146
147impl<T: Send + 'static> StreamingResponse<T> {
148 pub fn new<F>(f: F) -> StreamingResponse<T>
151 where
152 F: Future<Output = crate::Result<(Metadata, GrpcStreamWithTrailingMetadata<T>)>>
153 + Send
154 + 'static,
155 {
156 StreamingResponse(Box::pin(f))
157 }
158
159 pub fn metadata_and_stream_and_trailing_metadata<S, M>(
160 metadata: Metadata,
161 result: S,
162 trailing: M,
163 ) -> StreamingResponse<T>
164 where
165 S: Stream<Item = crate::Result<T>> + Send + 'static,
166 M: Future<Output = crate::Result<Metadata>> + Send + 'static,
167 {
168 let boxed = GrpcStreamWithTrailingMetadata::stream_with_trailing_metadata(result, trailing);
169 StreamingResponse::new(future::ok((metadata, boxed)))
170 }
171
172 pub fn metadata_and_stream<S>(metadata: Metadata, result: S) -> StreamingResponse<T>
173 where
174 S: Stream<Item = crate::Result<T>> + Send + 'static,
175 {
176 let boxed = GrpcStreamWithTrailingMetadata::stream(result);
177 StreamingResponse::new(future::ok((metadata, boxed)))
178 }
179
180 pub fn no_metadata<S>(s: S) -> StreamingResponse<T>
181 where
182 S: Stream<Item = crate::Result<T>> + Send + 'static,
183 {
184 StreamingResponse::metadata_and_stream_and_trailing_metadata(
185 Metadata::new(),
186 s,
187 future::ok(Metadata::new()),
188 )
189 }
190
191 pub fn completed_with_metadata_and_trailing_metadata(
192 metadata: Metadata,
193 r: Vec<T>,
194 trailing: Metadata,
195 ) -> StreamingResponse<T> {
196 StreamingResponse::iter_with_metadata_and_trailing_metadata(
197 metadata,
198 r.into_iter(),
199 future::ok(trailing),
200 )
201 }
202
203 pub fn completed_with_metadata(metadata: Metadata, r: Vec<T>) -> StreamingResponse<T> {
204 StreamingResponse::completed_with_metadata_and_trailing_metadata(
205 metadata,
206 r,
207 Metadata::new(),
208 )
209 }
210
211 pub fn iter_with_metadata_and_trailing_metadata<I, M>(
212 metadata: Metadata,
213 iter: I,
214 trailing: M,
215 ) -> StreamingResponse<T>
216 where
217 I: Iterator<Item = T> + Send + 'static,
218 M: Future<Output = crate::Result<Metadata>> + Send + 'static,
219 {
220 StreamingResponse::metadata_and_stream_and_trailing_metadata(
221 metadata,
222 stream::iter(iter).map(Ok),
223 trailing,
224 )
225 }
226
227 pub fn iter_with_metadata<I>(metadata: Metadata, iter: I) -> StreamingResponse<T>
228 where
229 I: Iterator<Item = T> + Send + 'static,
230 {
231 StreamingResponse::iter_with_metadata_and_trailing_metadata(
232 metadata,
233 iter,
234 future::ok(Metadata::new()),
235 )
236 }
237
238 pub fn completed(r: Vec<T>) -> StreamingResponse<T> {
239 StreamingResponse::completed_with_metadata_and_trailing_metadata(
240 Metadata::new(),
241 r,
242 Metadata::new(),
243 )
244 }
245
246 pub fn iter<I>(iter: I) -> StreamingResponse<T>
247 where
248 I: Iterator<Item = T> + Send + 'static,
249 {
250 StreamingResponse::iter_with_metadata_and_trailing_metadata(
251 Metadata::new(),
252 iter,
253 future::ok(Metadata::new()),
254 )
255 }
256
257 pub fn empty() -> StreamingResponse<T> {
258 StreamingResponse::no_metadata(stream::empty())
259 }
260
261 pub fn err(err: error::Error) -> StreamingResponse<T> {
263 StreamingResponse::new(future::err(err))
264 }
265
266 fn map_stream<U, F>(self, f: F) -> StreamingResponse<U>
269 where
270 U: Send + 'static,
271 F: FnOnce(GrpcStreamWithTrailingMetadata<T>) -> GrpcStreamWithTrailingMetadata<U>
272 + Send
273 + 'static,
274 {
275 StreamingResponse::new(
276 self.0
277 .map_ok(move |(metadata, stream)| (metadata, f(stream))),
278 )
279 }
280
281 pub fn map_items<U, F>(self, f: F) -> StreamingResponse<U>
282 where
283 U: Send + 'static,
284 F: FnMut(T) -> U + Send + 'static,
285 {
286 self.map_stream(move |stream| stream.map_items(f))
287 }
288
289 pub fn and_then_items<U, F>(self, f: F) -> StreamingResponse<U>
290 where
291 U: Send + 'static,
292 F: FnMut(T) -> result::Result<U> + Send + 'static,
293 {
294 self.map_stream(move |stream| stream.and_then_items(f))
295 }
296
297 pub fn drop_metadata(self) -> GrpcStream<T> {
298 Box::pin(
299 self.0
300 .map_ok(|(_metadata, stream)| stream.drop_metadata())
301 .try_flatten_stream(),
302 )
303 }
304
305 pub fn into_future(self) -> SingleResponse<Vec<T>> {
306 SingleResponse::new(self.0.map_ok(|(initial, stream)| {
307 let future: GrpcFuture<(Vec<T>, Metadata)> = stream.collect_with_metadata();
308 (initial, future)
309 }))
310 }
311
312 pub fn single(self) -> SingleResponse<T> {
314 SingleResponse(Box::pin(self.0.map_ok(|(metadata, stream)| {
315 (metadata, stream.single_with_metadata())
316 })))
317 }
318
319 pub fn collect(self) -> GrpcFuture<(Metadata, Vec<T>, Metadata)> {
320 self.into_future().join_metadata_result()
321 }
322}