use mssf_core::{
ErrorCode,
runtime::executor::{BoxedCancelToken, Timer},
};
use std::{pin::Pin, time::Duration};
struct TimeCounter {
timeout: Duration,
start: std::time::Instant,
}
impl TimeCounter {
pub fn new(timeout: Duration) -> Self {
TimeCounter {
timeout,
start: std::time::Instant::now(),
}
}
pub fn elapsed(&self) -> Duration {
self.start.elapsed()
}
pub fn remaining(&self) -> mssf_core::Result<Duration> {
if self.elapsed() < self.timeout {
Ok(self.timeout - self.elapsed())
} else {
Err(ErrorCode::FABRIC_E_TIMEOUT.into())
}
}
pub fn sleep_until_remaining(
&self,
timer: &dyn Timer,
) -> mssf_core::Result<impl Future<Output = ()>> {
let remaining = self.remaining()?;
Ok(timer.sleep(remaining))
}
}
#[derive(Default)]
pub struct OperationRetryerBuilder {
timer: Option<Box<dyn Timer>>,
default_timeout: Option<Duration>,
max_retry_interval: Option<Duration>,
}
impl OperationRetryerBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_timer(mut self, timer: Box<dyn Timer>) -> Self {
self.timer = Some(timer);
self
}
pub fn with_default_timeout(mut self, timeout: Duration) -> Self {
self.default_timeout = Some(timeout);
self
}
pub fn with_max_retry_interval(mut self, interval: Duration) -> Self {
self.max_retry_interval = Some(interval);
self
}
pub fn build(self) -> OperationRetryer {
OperationRetryer::new(
self.timer.unwrap_or(Box::new(crate::tokio::TokioTimer)),
self.default_timeout.unwrap_or(Duration::from_secs(30)),
self.max_retry_interval.unwrap_or(Duration::from_secs(5)),
)
}
}
pub struct OperationRetryer {
timer: Box<dyn Timer>,
default_timeout: Duration,
max_retry_interval: Duration,
}
impl OperationRetryer {
pub fn builder() -> OperationRetryerBuilder {
OperationRetryerBuilder::new()
}
fn new(timer: Box<dyn Timer>, default_timeout: Duration, max_retry_interval: Duration) -> Self {
OperationRetryer {
timer,
default_timeout,
max_retry_interval,
}
}
pub async fn run<T, F, Fut>(
&self,
op: F,
timeout: Option<Duration>,
token: Option<BoxedCancelToken>,
) -> mssf_core::Result<T>
where
F: Fn(Duration, Option<BoxedCancelToken>) -> Fut,
Fut: Future<Output = mssf_core::Result<T>> + Send,
T: Send,
{
let timeout = timeout.unwrap_or(self.default_timeout);
let timer = TimeCounter::new(timeout);
let mut cancel: Pin<Box<dyn std::future::Future<Output = ()> + Send>> =
if let Some(t) = &token {
t.wait()
} else {
Box::pin(std::future::pending())
};
loop {
let res = tokio::select! {
_ = timer.sleep_until_remaining(self.timer.as_ref())? => {
return Err(ErrorCode::FABRIC_E_TIMEOUT.into());
}
_ = &mut cancel => {
return Err(ErrorCode::E_ABORT.into());
}
res = op(timer.remaining()?, token.clone()) => res,
};
match res {
Ok(r) => return Ok(r),
Err(e) => match e.try_as_fabric_error_code() {
Ok(ec) if ec == ErrorCode::FABRIC_E_TIMEOUT || ec.is_transient() => {
#[cfg(feature = "tracing")]
tracing::debug!(
"Operation transient error {ec}. Remaining time {:?}. Retrying...",
timer.remaining()?
);
}
_ => return Err(e),
},
}
tokio::select! {
_ = self.timer.sleep(self.max_retry_interval) => {},
_ = timer.sleep_until_remaining(self.timer.as_ref())? => {
return Err(ErrorCode::FABRIC_E_TIMEOUT.into());
}
_ = &mut cancel => {
return Err(ErrorCode::E_ABORT.into());
}
}
}
}
}