use std::future::Future;
use camel_api::CamelError;
use camel_component_api::{NetworkRetryPolicy, retry_async};
use tonic::transport::Channel;
use tonic::{Code, Status};
use tracing::warn;
pub fn is_retryable_tonic_status(status: &tonic::Status) -> bool {
matches!(
status.code(),
Code::Unavailable | Code::DeadlineExceeded | Code::ResourceExhausted | Code::Aborted
)
}
pub(crate) fn tonic_to_camel_error(status: Status) -> CamelError {
let code = status.code();
let msg = status.message();
match code {
Code::NotFound => CamelError::ProcessorError(format!("grpc[NOT_FOUND]: {msg}")),
Code::Unavailable => {
CamelError::ProcessorError(format!("grpc[TRANSIENT][UNAVAILABLE]: {msg}"))
}
Code::DeadlineExceeded => {
CamelError::ProcessorError(format!("grpc[TRANSIENT][DEADLINE_EXCEEDED]: {msg}"))
}
Code::InvalidArgument => CamelError::Config(format!("grpc invalid argument: {msg}")),
other => CamelError::ProcessorError(format!("grpc[{other:?}]: {msg}")),
}
}
pub(crate) async fn retry_rpc<T, F, Fut>(
channel: Channel,
retry: &NetworkRetryPolicy,
kind: &str,
mut rpc_call: F,
) -> Result<T, CamelError>
where
F: FnMut(tonic::client::Grpc<Channel>) -> Fut,
Fut: Future<Output = Result<T, tonic::Status>>,
{
let result = retry_async::<T, _, _, _, tonic::Status>(
retry,
Some("grpc-producer"),
|| {
let grpc = tonic::client::Grpc::new(channel.clone());
rpc_call(grpc)
},
is_retryable_tonic_status,
)
.await;
match result {
Ok(response) => Ok(response),
Err(status) => {
warn!(
code = %status.code(),
"grpc {} call failed (retries exhausted or non-retryable)",
kind
);
Err(tonic_to_camel_error(status))
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use super::{is_retryable_tonic_status, retry_rpc, tonic_to_camel_error};
use camel_api::CamelError;
use camel_component_api::NetworkRetryPolicy;
#[test]
fn is_retryable_tonic_status_transient_codes() {
assert!(is_retryable_tonic_status(&tonic::Status::unavailable(
"down"
)));
assert!(is_retryable_tonic_status(
&tonic::Status::deadline_exceeded("timeout")
));
assert!(is_retryable_tonic_status(
&tonic::Status::resource_exhausted("rate limit")
));
assert!(is_retryable_tonic_status(&tonic::Status::aborted(
"conflict"
)));
}
#[test]
fn is_retryable_tonic_status_permanent_codes() {
assert!(!is_retryable_tonic_status(
&tonic::Status::invalid_argument("bad")
));
assert!(!is_retryable_tonic_status(&tonic::Status::not_found(
"missing"
)));
assert!(!is_retryable_tonic_status(
&tonic::Status::permission_denied("no")
));
assert!(!is_retryable_tonic_status(&tonic::Status::unauthenticated(
"who"
)));
assert!(!is_retryable_tonic_status(&tonic::Status::already_exists(
"dup"
)));
assert!(!is_retryable_tonic_status(
&tonic::Status::failed_precondition("state")
));
assert!(!is_retryable_tonic_status(&tonic::Status::out_of_range(
"oob"
)));
assert!(!is_retryable_tonic_status(&tonic::Status::unimplemented(
"noimpl"
)));
assert!(!is_retryable_tonic_status(&tonic::Status::internal("oops")));
assert!(!is_retryable_tonic_status(&tonic::Status::data_loss(
"lost"
)));
assert!(!is_retryable_tonic_status(&tonic::Status::unknown("?")));
assert!(!is_retryable_tonic_status(&tonic::Status::cancelled(
"stopped"
)));
}
#[tokio::test]
async fn retry_attempts_count_matches_policy_max_attempts() {
let retry = NetworkRetryPolicy {
enabled: true,
max_attempts: 3,
initial_delay: Duration::from_millis(1),
max_delay: Duration::from_millis(5),
..NetworkRetryPolicy::default()
};
let calls = Arc::new(AtomicU32::new(0));
let calls_clone = Arc::clone(&calls);
let channel = tonic::transport::Endpoint::from_static("http://127.0.0.1:1").connect_lazy();
let result = retry_rpc::<(), _, _>(channel, &retry, "test", |_grpc| {
let c = Arc::clone(&calls_clone);
async move {
c.fetch_add(1, Ordering::SeqCst);
Err(tonic::Status::unavailable("simulated transient failure"))
}
})
.await;
assert!(result.is_err());
assert_eq!(calls.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn retry_rpc_returns_processor_error_on_exhausted_transient() {
let retry = NetworkRetryPolicy {
enabled: true,
max_attempts: 2,
initial_delay: Duration::from_millis(1),
max_delay: Duration::from_millis(5),
..NetworkRetryPolicy::default()
};
let channel = tonic::transport::Endpoint::from_static("http://127.0.0.1:1").connect_lazy();
let result = retry_rpc::<(), _, _>(channel, &retry, "test", |_grpc| async move {
Err(tonic::Status::unavailable("simulated transient failure"))
})
.await;
let err = result.unwrap_err();
assert!(
matches!(err, CamelError::ProcessorError(_)),
"expected ProcessorError, got {:?}",
err
);
assert!(
err.to_string().contains("grpc[TRANSIENT][UNAVAILABLE]"),
"expected error to contain transient marker, got: {}",
err
);
}
#[test]
fn test_tonic_to_camel_error_unavailable() {
let status = tonic::Status::unavailable("service down");
let err = tonic_to_camel_error(status);
assert!(matches!(err, CamelError::ProcessorError(_)));
assert!(err.to_string().contains("grpc[TRANSIENT][UNAVAILABLE]"));
assert!(err.to_string().contains("service down"));
}
#[test]
fn test_tonic_to_camel_error_not_found() {
let status = tonic::Status::not_found("method not found");
let err = tonic_to_camel_error(status);
assert!(matches!(err, CamelError::ProcessorError(_)));
assert!(err.to_string().contains("grpc[NOT_FOUND]"));
assert!(err.to_string().contains("method not found"));
}
#[test]
fn test_tonic_to_camel_error_deadline_exceeded() {
let status = tonic::Status::deadline_exceeded("timeout");
let err = tonic_to_camel_error(status);
assert!(matches!(err, CamelError::ProcessorError(_)));
assert!(
err.to_string()
.contains("grpc[TRANSIENT][DEADLINE_EXCEEDED]")
);
}
#[test]
fn test_tonic_to_camel_error_invalid_argument_maps_to_config() {
let status = tonic::Status::invalid_argument("bad arg");
let err = tonic_to_camel_error(status);
assert!(matches!(err, CamelError::Config(_)));
assert!(err.to_string().contains("grpc invalid argument"));
}
#[test]
fn test_tonic_to_camel_error_various_codes() {
let codes = [
tonic::Status::permission_denied("no access"),
tonic::Status::resource_exhausted("too many"),
tonic::Status::failed_precondition("bad state"),
tonic::Status::aborted("conflict"),
tonic::Status::out_of_range("oob"),
tonic::Status::unimplemented("no impl"),
tonic::Status::internal("oops"),
tonic::Status::data_loss("lost"),
tonic::Status::unauthenticated("who are you"),
];
for status in codes {
let err = tonic_to_camel_error(status);
assert!(matches!(err, CamelError::ProcessorError(_)));
assert!(err.to_string().contains("grpc["));
}
}
}