use async_stream::stream;
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use futures_core::future::BoxFuture;
use futures_core::Stream;
use futures_util::StreamExt;
use tokio::time::sleep;
use tonic::transport::Channel;
use tonic::Status;
use crate::mirror_query::AnyMirrorQueryData;
use crate::{
Client,
Error,
MirrorQuery,
};
impl<D> MirrorQuery<D>
where
D: MirrorQueryExecute,
{
#[allow(clippy::missing_errors_doc)]
pub async fn execute(&mut self, client: &Client) -> crate::Result<D::Response> {
self.execute_with_optional_timeout(client, None).await
}
pub(crate) async fn execute_with_optional_timeout(
&self,
client: &Client,
timeout: Option<std::time::Duration>,
) -> crate::Result<D::Response> {
self.data.execute_with_optional_timeout(&self.common, client, timeout).await
}
#[allow(clippy::missing_errors_doc)]
pub async fn execute_with_timeout(
&mut self,
client: &Client,
timeout: std::time::Duration,
) -> crate::Result<D::Response> {
self.execute_with_optional_timeout(client, Some(timeout)).await
}
pub fn subscribe<'a>(&self, client: &'a Client) -> D::ItemStream<'a> {
self.subscribe_with_optional_timeout(client, None)
}
pub fn subscribe_with_timeout<'a>(
&self,
client: &'a Client,
timeout: std::time::Duration,
) -> D::ItemStream<'a> {
self.subscribe_with_optional_timeout(client, Some(timeout))
}
pub(crate) fn subscribe_with_optional_timeout<'a>(
&self,
client: &'a Client,
timeout: Option<std::time::Duration>,
) -> D::ItemStream<'a> {
self.data.subscribe_with_optional_timeout(&self.common, client, timeout)
}
}
pub trait MirrorQueryExecute: Sized + Into<AnyMirrorQueryData> + Send + Sync {
type Item;
type Response;
type ItemStream<'a>: Stream<Item = crate::Result<Self::Item>> + 'a
where
Self: 'a;
fn subscribe_with_optional_timeout<'a>(
&self,
params: &crate::mirror_query::MirrorQueryCommon,
client: &'a crate::Client,
timeout: Option<std::time::Duration>,
) -> Self::ItemStream<'a>
where
Self: 'a;
fn execute_with_optional_timeout<'a>(
&'a self,
params: &'a super::MirrorQueryCommon,
client: &'a Client,
timeout: Option<std::time::Duration>,
) -> BoxFuture<'a, crate::Result<Self::Response>>;
}
impl<T> MirrorQueryExecute for T
where
T: MirrorRequest + Sync + Clone + Into<AnyMirrorQueryData>,
{
type Item = <Self as MirrorRequest>::Item;
type Response = <Self as MirrorRequest>::Response;
type ItemStream<'a> = <Self as MirrorRequest>::ItemStream<'a> where Self: 'a;
fn subscribe_with_optional_timeout<'a>(
&self,
_params: &crate::mirror_query::MirrorQueryCommon,
client: &'a crate::Client,
timeout: Option<std::time::Duration>,
) -> Self::ItemStream<'a>
where
Self: 'a,
{
let timeout = timeout.or_else(|| client.request_timeout()).unwrap_or_else(|| {
std::time::Duration::from_millis(backoff::default::MAX_ELAPSED_TIME_MILLIS)
});
let channel = client.mirror_network().channel();
let self_ = self.clone();
Self::make_item_stream(crate::mirror_query::subscribe(channel, timeout, self_))
}
fn execute_with_optional_timeout<'a>(
&'a self,
_params: &'a crate::mirror_query::MirrorQueryCommon,
client: &crate::Client,
timeout: Option<std::time::Duration>,
) -> BoxFuture<'a, crate::Result<Self::Response>> {
let timeout = timeout.or_else(|| client.request_timeout()).unwrap_or_else(|| {
std::time::Duration::from_millis(backoff::default::MAX_ELAPSED_TIME_MILLIS)
});
let channel = client.mirror_network().channel();
Self::try_collect(crate::mirror_query::subscribe(channel, timeout, self.clone()))
}
}
pub trait MirrorRequest: Send {
type GrpcItem: Send;
type ConnectStream: Stream<Item = tonic::Result<Self::GrpcItem>> + Send;
type Item;
type Response;
type ItemStream<'a>: Stream<Item = crate::Result<Self::Item>> + 'a;
fn connect(&self, channel: Channel) -> BoxFuture<'_, tonic::Result<Self::ConnectStream>>;
#[allow(unused_variables)]
fn should_retry(&self, status_code: tonic::Code) -> bool {
false
}
fn make_item_stream<'a, S>(stream: S) -> Self::ItemStream<'a>
where
S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a;
fn try_collect<'a, S>(stream: S) -> BoxFuture<'a, crate::Result<Self::Response>>
where
S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a;
}
pub(crate) fn subscribe<I: Send, R: MirrorRequest<GrpcItem = I> + Send + Sync>(
channel: Channel,
timeout: std::time::Duration,
request: R,
) -> impl Stream<Item = crate::Result<I>> + Send {
stream! {
let request = request;
let mut backoff = ExponentialBackoff {
max_elapsed_time: Some(timeout),
..ExponentialBackoff::default()
};
let mut backoff_inf = ExponentialBackoff::default();
backoff_inf.max_elapsed_time = None;
loop {
let status: Status = 'request: loop {
let response = request.connect(channel.clone()).await;
let stream = match response {
Ok(stream) => stream,
Err(status) => {
break 'request status;
}
};
futures_util::pin_mut!(stream);
backoff.reset();
backoff_inf.reset();
#[allow(unused_labels)]
'message: loop {
let message = stream.next().await.transpose();
let message = match message {
Ok(Some(message)) => message,
Ok(None) => {
return;
}
Err(status) => {
break 'request status;
}
};
yield Ok(message);
}
};
match status.code() {
tonic::Code::Unavailable | tonic::Code::ResourceExhausted => {
sleep(backoff_inf.next_backoff().unwrap()).await;
}
tonic::Code::Unknown if status.message() == "error reading a body from connection: connection reset" => {
sleep(backoff_inf.next_backoff().unwrap()).await;
}
code if request.should_retry(code) => {
if let Some(duration) = backoff.next_backoff() {
sleep(duration).await;
} else {
yield Err(Error::TimedOut(Error::from(status).into()));
return;
}
}
_ => {
yield Err(Error::from(status));
return;
}
}
}
}
}