use std::time::Duration;
use futures::FutureExt;
use opcua_types::StatusCode;
use crate::retry::ExponentialBackoff;
use super::{session_debug, Session, UARequest};
pub trait RequestRetryPolicy {
fn get_next_delay(&mut self, status: StatusCode) -> Option<Duration>;
}
impl RequestRetryPolicy for Box<dyn RequestRetryPolicy + Send> {
fn get_next_delay(&mut self, status: StatusCode) -> Option<Duration> {
(**self).get_next_delay(status)
}
}
#[derive(Clone)]
pub struct DefaultRetryPolicy<'a> {
backoff: ExponentialBackoff,
extra_status_codes: &'a [StatusCode],
}
impl<'a> DefaultRetryPolicy<'a> {
pub fn new(backoff: ExponentialBackoff) -> Self {
Self {
backoff,
extra_status_codes: &[],
}
}
pub fn new_with_extras(
backoff: ExponentialBackoff,
extra_status_codes: &'a [StatusCode],
) -> Self {
Self {
backoff,
extra_status_codes,
}
}
}
impl RequestRetryPolicy for DefaultRetryPolicy<'_> {
fn get_next_delay(&mut self, status: StatusCode) -> Option<Duration> {
let should_retry = matches!(
status,
StatusCode::BadUnexpectedError
| StatusCode::BadInternalError
| StatusCode::BadOutOfMemory
| StatusCode::BadResourceUnavailable
| StatusCode::BadCommunicationError
| StatusCode::BadTimeout
| StatusCode::BadShutdown
| StatusCode::BadServerNotConnected
| StatusCode::BadServerHalted
| StatusCode::BadNonceInvalid
| StatusCode::BadSessionClosed
| StatusCode::BadSessionIdInvalid
| StatusCode::BadSessionNotActivated
| StatusCode::BadNoCommunication
| StatusCode::BadTooManySessions
| StatusCode::BadTcpServerTooBusy
| StatusCode::BadTcpSecureChannelUnknown
| StatusCode::BadTcpNotEnoughResources
| StatusCode::BadTcpInternalError
| StatusCode::BadSecureChannelClosed
| StatusCode::BadSecureChannelIdInvalid
| StatusCode::BadNotConnected
| StatusCode::BadDeviceFailure
| StatusCode::BadSensorFailure
| StatusCode::BadDisconnect
| StatusCode::BadConnectionClosed
| StatusCode::BadEndOfStream
| StatusCode::BadInvalidState
| StatusCode::BadMaxConnectionsReached
| StatusCode::BadConnectionRejected
) || self.extra_status_codes.contains(&status);
if should_retry {
self.backoff.next()
} else {
None
}
}
}
impl Session {
pub async fn send_with_retry<T: UARequest + Clone>(
&self,
request: T,
mut policy: impl RequestRetryPolicy,
) -> Result<T::Out, StatusCode> {
loop {
let next_request = request.clone();
match next_request.send(&self.channel).boxed().await {
Ok(r) => break Ok(r),
Err(e) => {
if let Some(delay) = policy.get_next_delay(e) {
session_debug!(self, "Request failed, retrying after {delay:?}");
tokio::time::sleep(delay).await;
} else {
break Err(e);
}
}
}
}
}
}