use futures::future;
use futures::future::Future;
use futures::stream;
use futures::stream::Stream;
use error;
use futures_grpc::*;
use iter::*;
use metadata::*;
use result;
use stream_item::*;
pub struct SingleResponse<T: Send + 'static>(pub GrpcFuture<(Metadata, GrpcFuture<(T, Metadata)>)>);
impl<T: Send + 'static> SingleResponse<T> {
pub fn new<F>(f: F) -> SingleResponse<T>
where
F: Future<Item = (Metadata, GrpcFuture<(T, Metadata)>), Error = error::Error>
+ Send
+ 'static,
{
SingleResponse(Box::new(f))
}
pub fn metadata_and_future<F>(metadata: Metadata, result: F) -> SingleResponse<T>
where
F: Future<Item = T, Error = error::Error> + Send + 'static,
{
SingleResponse::metadata_and_future_and_trailing_metadata(
metadata,
result,
future::ok(Metadata::new()),
)
}
pub fn metadata_and_future_and_trailing_metadata<F, M>(
metadata: Metadata,
result: F,
trailing: M,
) -> SingleResponse<T>
where
F: Future<Item = T, Error = error::Error> + Send + 'static,
M: Future<Item = Metadata, Error = error::Error> + Send + 'static,
{
let future: GrpcFuture<(T, Metadata)> = Box::new(result.join(trailing));
SingleResponse::new(future::finished((metadata, future)))
}
pub fn completed_with_metadata_and_trailing_metadata(
metadata: Metadata,
r: T,
trailing: Metadata,
) -> SingleResponse<T> {
SingleResponse::metadata_and_future_and_trailing_metadata(
metadata,
future::ok(r),
future::ok(trailing),
)
}
pub fn completed_with_metadata(metadata: Metadata, r: T) -> SingleResponse<T> {
SingleResponse::completed_with_metadata_and_trailing_metadata(metadata, r, Metadata::new())
}
pub fn completed(r: T) -> SingleResponse<T> {
SingleResponse::completed_with_metadata_and_trailing_metadata(
Metadata::new(),
r,
Metadata::new(),
)
}
pub fn no_metadata<F>(r: F) -> SingleResponse<T>
where
F: Future<Item = T, Error = error::Error> + Send + 'static,
{
SingleResponse::metadata_and_future(Metadata::new(), r)
}
pub fn err(err: error::Error) -> SingleResponse<T> {
SingleResponse::new(future::err(err))
}
pub fn join_metadata_result(self) -> GrpcFuture<(Metadata, T, Metadata)> {
Box::new(self.0.and_then(|(initial, result)| {
result.map(|(result, trailing)| (initial, result, trailing))
}))
}
pub fn drop_metadata(self) -> GrpcFuture<T> {
Box::new(
self.0
.and_then(|(_initial, result)| result.map(|(result, _trailing)| result)),
)
}
pub fn into_stream(self) -> StreamingResponse<T> {
StreamingResponse::new(self.0.map(|(metadata, future)| {
let stream = future
.map(|(result, trailing)| {
stream::iter_ok(vec![
ItemOrMetadata::Item(result),
ItemOrMetadata::TrailingMetadata(trailing),
])
}).flatten_stream();
(metadata, GrpcStreamWithTrailingMetadata::new(stream))
}))
}
pub fn wait(self) -> result::Result<(Metadata, T, Metadata)> {
self.join_metadata_result().wait()
}
pub fn wait_drop_metadata(self) -> result::Result<T> {
self.wait().map(|(_initial, r, _trailing)| r)
}
}
pub struct StreamingResponse<T: Send + 'static>(
pub GrpcFuture<(Metadata, GrpcStreamWithTrailingMetadata<T>)>,
);
impl<T: Send + 'static> StreamingResponse<T> {
pub fn new<F>(f: F) -> StreamingResponse<T>
where
F: Future<Item = (Metadata, GrpcStreamWithTrailingMetadata<T>), Error = error::Error>
+ Send
+ 'static,
{
StreamingResponse(Box::new(f))
}
pub fn metadata_and_stream_and_trailing_metadata<S, M>(
metadata: Metadata,
result: S,
trailing: M,
) -> StreamingResponse<T>
where
S: Stream<Item = T, Error = error::Error> + Send + 'static,
M: Future<Item = Metadata, Error = error::Error> + Send + 'static,
{
let boxed = GrpcStreamWithTrailingMetadata::stream_with_trailing_metadata(result, trailing);
StreamingResponse::new(future::ok((metadata, boxed)))
}
pub fn metadata_and_stream<S>(metadata: Metadata, result: S) -> StreamingResponse<T>
where
S: Stream<Item = T, Error = error::Error> + Send + 'static,
{
let boxed = GrpcStreamWithTrailingMetadata::stream(result);
StreamingResponse::new(future::ok((metadata, boxed)))
}
pub fn no_metadata<S>(s: S) -> StreamingResponse<T>
where
S: Stream<Item = T, Error = error::Error> + Send + 'static,
{
StreamingResponse::metadata_and_stream_and_trailing_metadata(
Metadata::new(),
s,
future::ok(Metadata::new()),
)
}
pub fn completed_with_metadata_and_trailing_metadata(
metadata: Metadata,
r: Vec<T>,
trailing: Metadata,
) -> StreamingResponse<T> {
StreamingResponse::iter_with_metadata_and_trailing_metadata(
metadata,
r.into_iter(),
future::ok(trailing),
)
}
pub fn completed_with_metadata(metadata: Metadata, r: Vec<T>) -> StreamingResponse<T> {
StreamingResponse::completed_with_metadata_and_trailing_metadata(
metadata,
r,
Metadata::new(),
)
}
pub fn iter_with_metadata_and_trailing_metadata<I, M>(
metadata: Metadata,
iter: I,
trailing: M,
) -> StreamingResponse<T>
where
I: Iterator<Item = T> + Send + 'static,
M: Future<Item = Metadata, Error = error::Error> + Send + 'static,
{
StreamingResponse::metadata_and_stream_and_trailing_metadata(
metadata,
stream::iter_ok(iter),
trailing,
)
}
pub fn iter_with_metadata<I>(metadata: Metadata, iter: I) -> StreamingResponse<T>
where
I: Iterator<Item = T> + Send + 'static,
{
StreamingResponse::iter_with_metadata_and_trailing_metadata(
metadata,
iter,
future::ok(Metadata::new()),
)
}
pub fn completed(r: Vec<T>) -> StreamingResponse<T> {
StreamingResponse::completed_with_metadata_and_trailing_metadata(
Metadata::new(),
r,
Metadata::new(),
)
}
pub fn iter<I>(iter: I) -> StreamingResponse<T>
where
I: Iterator<Item = T> + Send + 'static,
{
StreamingResponse::iter_with_metadata_and_trailing_metadata(
Metadata::new(),
iter,
future::ok(Metadata::new()),
)
}
pub fn empty() -> StreamingResponse<T> {
StreamingResponse::no_metadata(stream::empty())
}
pub fn err(err: error::Error) -> StreamingResponse<T> {
StreamingResponse::new(future::err(err))
}
fn map_stream<U, F>(self, f: F) -> StreamingResponse<U>
where
U: Send + 'static,
F: FnOnce(GrpcStreamWithTrailingMetadata<T>) -> GrpcStreamWithTrailingMetadata<U>
+ Send
+ 'static,
{
StreamingResponse::new(self.0.map(move |(metadata, stream)| (metadata, f(stream))))
}
pub fn map_items<U, F>(self, f: F) -> StreamingResponse<U>
where
U: Send + 'static,
F: FnMut(T) -> U + Send + 'static,
{
self.map_stream(move |stream| stream.map_items(f))
}
pub fn and_then_items<U, F>(self, f: F) -> StreamingResponse<U>
where
U: Send + 'static,
F: FnMut(T) -> result::Result<U> + Send + 'static,
{
self.map_stream(move |stream| stream.and_then_items(f))
}
pub fn drop_metadata(self) -> GrpcStream<T> {
Box::new(
self.0
.map(|(_metadata, stream)| stream.drop_metadata())
.flatten_stream(),
)
}
pub fn into_future(self) -> SingleResponse<Vec<T>> {
SingleResponse::new(self.0.map(|(initial, stream)| {
let future: GrpcFuture<(Vec<T>, Metadata)> = stream.collect_with_metadata();
(initial, future)
}))
}
pub fn single(self) -> SingleResponse<T> {
SingleResponse(Box::new(
self.0
.map(|(metadata, stream)| (metadata, stream.single_with_metadata())),
))
}
pub fn wait(self) -> result::Result<(Metadata, GrpcIterator<T>)> {
let (metadata, stream) = self.0.wait()?;
Ok((metadata, Box::new(stream.wait())))
}
pub fn wait_drop_metadata(self) -> GrpcIterator<T> {
Box::new(self.drop_metadata().wait())
}
pub fn collect(self) -> GrpcFuture<(Metadata, Vec<T>, Metadata)> {
self.into_future().join_metadata_result()
}
}