use super::{StreamOf, StreamOfResults};
use crate::error::BackendError;
use futures::{FutureExt, Stream, StreamExt};
use std::{future::Future, pin::Pin, task::Poll};
#[cfg(feature = "runtime")]
pub(crate) fn spawn<F: std::future::Future + Send + 'static>(future: F) {
#[cfg(not(target_family = "wasm"))]
tokio::spawn(async move {
future.await;
});
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
wasm_bindgen_futures::spawn_local(async move {
future.await;
});
}
pub async fn retry<T, F, R>(mut retry_future: F) -> Result<R, BackendError>
where
F: FnMut() -> T,
T: Future<Output = Result<R, BackendError>>,
{
const REJECTED_MAX_RETRIES: usize = 10;
let mut rejected_retries = 0;
loop {
match retry_future().await {
Ok(v) => return Ok(v),
Err(e) => {
if e.is_disconnected_will_reconnect() {
continue;
}
if e.is_rpc_limit_reached() && rejected_retries < REJECTED_MAX_RETRIES {
rejected_retries += 1;
continue;
}
return Err(e);
}
}
}
}
pub async fn retry_stream<F, Fut, R>(get_stream: F) -> Result<StreamOfResults<R>, BackendError>
where
F: Clone + Send + 'static + FnMut() -> Fut,
Fut: Future<Output = Result<StreamOfResults<R>, BackendError>> + Send,
R: Send + 'static,
{
let get_stream_with_retry = move || {
let get_stream = get_stream.clone();
async move { retry(get_stream).await }.boxed()
};
Ok(StreamOf::new(Box::pin(RetrySubscription {
state: RetrySubscriptionState::Init,
resubscribe: get_stream_with_retry,
})))
}
struct RetrySubscription<F, R, T> {
resubscribe: F,
state: RetrySubscriptionState<R, T>,
}
enum RetrySubscriptionState<R, T> {
Init,
Pending(R),
Stream(StreamOfResults<T>),
Done,
}
impl<F, R, T> std::marker::Unpin for RetrySubscription<F, R, T> {}
impl<F, R, T> Stream for RetrySubscription<F, R, T>
where
F: FnMut() -> R,
R: Future<Output = Result<StreamOfResults<T>, BackendError>> + Unpin,
{
type Item = Result<T, BackendError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
match &mut self.state {
RetrySubscriptionState::Init => {
self.state = RetrySubscriptionState::Pending((self.resubscribe)());
}
RetrySubscriptionState::Stream(s) => match s.poll_next_unpin(cx) {
Poll::Ready(Some(Err(err))) => {
if err.is_disconnected_will_reconnect() {
self.state = RetrySubscriptionState::Init;
}
return Poll::Ready(Some(Err(err)));
}
Poll::Ready(None) => {
self.state = RetrySubscriptionState::Done;
return Poll::Ready(None);
}
Poll::Ready(Some(Ok(val))) => {
return Poll::Ready(Some(Ok(val)));
}
Poll::Pending => {
return Poll::Pending;
}
},
RetrySubscriptionState::Pending(fut) => match fut.poll_unpin(cx) {
Poll::Ready(Err(err)) => {
if err.is_disconnected_will_reconnect() {
self.state = RetrySubscriptionState::Init;
}
self.state = RetrySubscriptionState::Done;
return Poll::Ready(Some(Err(err)));
}
Poll::Ready(Ok(stream)) => {
self.state = RetrySubscriptionState::Stream(stream);
continue;
}
Poll::Pending => {
return Poll::Pending;
}
},
RetrySubscriptionState::Done => return Poll::Ready(None),
};
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::StreamOf;
fn disconnect_err() -> BackendError {
BackendError::Rpc(subxt_rpcs::Error::DisconnectedWillReconnect(String::new()).into())
}
fn custom_err() -> BackendError {
BackendError::other("")
}
#[tokio::test]
async fn retry_stream_works() {
let retry_stream = retry_stream(|| {
async {
Ok(StreamOf::new(Box::pin(futures::stream::iter([
Ok(1),
Ok(2),
Ok(3),
Err(disconnect_err()),
]))))
}
.boxed()
})
.await
.unwrap();
let result = retry_stream
.take(5)
.collect::<Vec<Result<usize, BackendError>>>()
.await;
assert!(matches!(result[0], Ok(r) if r == 1));
assert!(matches!(result[1], Ok(r) if r == 2));
assert!(matches!(result[2], Ok(r) if r == 3));
assert!(matches!(result[3], Err(ref e) if e.is_disconnected_will_reconnect()));
assert!(matches!(result[4], Ok(r) if r == 1));
}
#[tokio::test]
async fn retry_sub_works() {
let stream = futures::stream::iter([Ok(1), Err(disconnect_err())]);
let resubscribe = Box::new(move || {
async move { Ok(StreamOf::new(Box::pin(futures::stream::iter([Ok(2)])))) }.boxed()
});
let retry_stream = RetrySubscription {
state: RetrySubscriptionState::Stream(StreamOf::new(Box::pin(stream))),
resubscribe,
};
let result: Vec<_> = retry_stream.collect().await;
assert!(matches!(result[0], Ok(r) if r == 1));
assert!(matches!(result[1], Err(ref e) if e.is_disconnected_will_reconnect()));
assert!(matches!(result[2], Ok(r) if r == 2));
}
#[tokio::test]
async fn retry_sub_err_terminates_stream() {
let stream = futures::stream::iter([Ok(1)]);
let resubscribe = Box::new(|| async move { Err(custom_err()) }.boxed());
let retry_stream = RetrySubscription {
state: RetrySubscriptionState::Stream(StreamOf::new(Box::pin(stream))),
resubscribe,
};
assert_eq!(retry_stream.count().await, 1);
}
#[tokio::test]
async fn retry_sub_resubscribe_err() {
let stream = futures::stream::iter([Ok(1), Err(disconnect_err())]);
let resubscribe = Box::new(|| async move { Err(custom_err()) }.boxed());
let retry_stream = RetrySubscription {
state: RetrySubscriptionState::Stream(StreamOf::new(Box::pin(stream))),
resubscribe,
};
let result: Vec<_> = retry_stream.collect().await;
assert!(matches!(result[0], Ok(r) if r == 1));
assert!(matches!(result[1], Err(ref e) if e.is_disconnected_will_reconnect()));
assert!(matches!(result[2], Err(ref e) if matches!(e, BackendError::Other(_))));
}
}