use futures::future;
use futures::stream;
use futures::stream::Stream;
use futures::stream::StreamExt;
use crate::error::Error;
use futures::channel::mpsc;
use crate::futures_grpc::GrpcStream;
use crate::proto::metadata::Metadata;
#[derive(Debug, Default, Clone)]
pub struct RequestOptions {
pub metadata: Metadata,
pub cachable: bool,
}
impl RequestOptions {
pub fn new() -> RequestOptions {
Default::default()
}
}
pub struct StreamingRequest<T: Send + 'static>(pub GrpcStream<T>);
impl<T: Send + 'static> StreamingRequest<T> {
pub fn new<S>(stream: S) -> StreamingRequest<T>
where
S: Stream<Item = crate::Result<T>> + Send + 'static,
{
StreamingRequest(Box::pin(stream))
}
pub fn once(item: T) -> StreamingRequest<T> {
StreamingRequest::new(stream::once(future::ok(item)))
}
pub fn iter<I>(iter: I) -> StreamingRequest<T>
where
I: IntoIterator<Item = T>,
I::IntoIter: Send + 'static,
{
StreamingRequest::new(stream::iter(iter.into_iter()).map(Ok))
}
pub fn mpsc() -> (StreamingRequestSender<T>, StreamingRequest<T>) {
let (tx, rx) = mpsc::channel(0);
let tx = StreamingRequestSender { sender: Some(tx) };
let rx = StreamingRequest::new(rx.map(Ok));
(tx, rx)
}
pub fn single(item: T) -> StreamingRequest<T> {
StreamingRequest::new(stream::once(future::ok(item)))
}
pub fn empty() -> StreamingRequest<T> {
StreamingRequest::new(stream::empty())
}
pub fn err(err: Error) -> StreamingRequest<T> {
StreamingRequest::new(stream::once(future::err(err)))
}
}
pub struct StreamingRequestSender<T: Send + 'static> {
sender: Option<mpsc::Sender<T>>,
}
impl<T: Send + 'static> Drop for StreamingRequestSender<T> {
fn drop(&mut self) {
}
}