use super::{Error, RetryPolicy, RetryResult, RetryState, ThrottleResult};
use crate::error::rpc::Code;
use std::time::Duration;
#[derive(Debug)]
pub struct TooManyRequests<P>
where
P: RetryPolicy,
{
inner: P,
}
impl<P> TooManyRequests<P>
where
P: RetryPolicy,
{
pub fn new(inner: P) -> Self {
Self { inner }
}
fn is_resource_exhausted(e: &Error) -> bool {
e.status()
.is_some_and(|s| s.code == Code::ResourceExhausted)
}
fn is_too_many_requests(e: &Error) -> bool {
e.http_status_code()
.is_some_and(|code| code == http::StatusCode::TOO_MANY_REQUESTS.as_u16())
}
}
impl<P> RetryPolicy for TooManyRequests<P>
where
P: RetryPolicy,
{
fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
if Self::is_resource_exhausted(&error) || Self::is_too_many_requests(&error) {
return RetryResult::Continue(error);
}
self.inner.on_error(state, error)
}
fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
self.inner.on_throttle(state, error)
}
fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
self.inner.remaining_time(state)
}
}
#[cfg(test)]
mod tests {
use super::super::tests::{MockPolicy, idempotent_state};
use super::*;
use crate::error::rpc::Status;
use crate::retry_policy::NeverRetry;
use std::time::Instant;
fn too_many_requests() -> Error {
Error::service(Status::default().set_code(Code::ResourceExhausted))
}
fn too_many_requests_http() -> Error {
Error::http(
http::StatusCode::TOO_MANY_REQUESTS.as_u16(),
http::HeaderMap::new(),
bytes::Bytes::new(),
)
}
fn permanent() -> Error {
Error::service(Status::default().set_code(Code::PermissionDenied))
}
fn transient() -> Error {
Error::service(Status::default().set_code(Code::Unavailable))
}
#[test]
fn on_error() {
let policy = TooManyRequests::new(NeverRetry);
let result = policy.on_error(&idempotent_state(Instant::now()), too_many_requests());
assert!(matches!(result, RetryResult::Continue(_)), "{result:?}");
let result = policy.on_error(&idempotent_state(Instant::now()), too_many_requests_http());
assert!(matches!(result, RetryResult::Continue(_)), "{result:?}");
let result = policy.on_error(&idempotent_state(Instant::now()), permanent());
assert!(matches!(result, RetryResult::Exhausted(_)), "{result:?}");
}
#[test]
fn ext() {
use super::super::RetryPolicyExt;
let policy = NeverRetry.continue_on_too_many_requests();
let result = policy.on_error(&idempotent_state(Instant::now()), too_many_requests());
assert!(matches!(result, RetryResult::Continue(_)), "{result:?}");
let result = policy.on_error(&idempotent_state(Instant::now()), too_many_requests_http());
assert!(matches!(result, RetryResult::Continue(_)), "{result:?}");
let result = policy.on_error(&idempotent_state(Instant::now()), permanent());
assert!(matches!(result, RetryResult::Exhausted(_)), "{result:?}");
}
#[test]
fn forwards() {
let mut mock = MockPolicy::new();
mock.expect_on_error()
.times(1..)
.returning(|_, e| RetryResult::Permanent(e));
mock.expect_on_throttle()
.times(1..)
.returning(|_, e| ThrottleResult::Exhausted(e));
mock.expect_remaining_time().times(1).returning(|_| None);
let policy = TooManyRequests::new(mock);
let result = policy.on_error(&idempotent_state(Instant::now()), transient());
assert!(matches!(result, RetryResult::Permanent(_)), "{result:?}");
let result = policy.on_error(&idempotent_state(Instant::now()), too_many_requests());
assert!(matches!(result, RetryResult::Continue(_)), "{result:?}");
let result = policy.on_throttle(&idempotent_state(Instant::now()), transient());
assert!(matches!(result, ThrottleResult::Exhausted(_)));
let result = policy.on_throttle(&idempotent_state(Instant::now()), too_many_requests());
assert!(matches!(result, ThrottleResult::Exhausted(_)));
let result = policy.remaining_time(&idempotent_state(Instant::now()));
assert!(result.is_none(), "{result:?}");
}
}