#![allow(clippy::unwrap_used, unreachable_pub)]
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use async_trait::async_trait;
use entelix_core::error::Error;
use entelix_core::transports::RetryPolicy;
use entelix_core::{ExecutionContext, Result, TenantId};
use entelix_runnable::{Runnable, RunnableExt, RunnableLambda};
#[allow(dead_code)]
mod _shim {
pub use entelix_core::backoff::ExponentialBackoff;
}
struct Flaky {
fails_remaining: AtomicU32,
calls: Arc<AtomicU32>,
}
impl Flaky {
fn new(fail_count: u32) -> (Self, Arc<AtomicU32>) {
let calls = Arc::new(AtomicU32::new(0));
(
Self {
fails_remaining: AtomicU32::new(fail_count),
calls: calls.clone(),
},
calls,
)
}
}
#[async_trait]
impl Runnable<u32, u32> for Flaky {
async fn invoke(&self, input: u32, _ctx: &ExecutionContext) -> Result<u32> {
self.calls.fetch_add(1, Ordering::SeqCst);
let remaining = self.fails_remaining.load(Ordering::SeqCst);
if remaining > 0 {
self.fails_remaining.fetch_sub(1, Ordering::SeqCst);
return Err(Error::provider_http(503, "transient"));
}
Ok(input * 2)
}
}
struct AlwaysAuthError;
#[async_trait]
impl Runnable<u32, u32> for AlwaysAuthError {
async fn invoke(&self, _input: u32, _ctx: &ExecutionContext) -> Result<u32> {
Err(Error::provider_http(401, "unauthorised"))
}
}
fn fast_policy(max_attempts: u32) -> RetryPolicy {
RetryPolicy::standard()
.with_max_attempts(max_attempts)
.with_backoff(_shim::ExponentialBackoff::new(
Duration::from_millis(1),
Duration::from_millis(2),
))
}
#[tokio::test]
async fn with_retry_succeeds_after_transient_failures() {
let (flaky, calls) = Flaky::new(2);
let resilient = flaky.with_retry(fast_policy(5));
let out = resilient
.invoke(21, &ExecutionContext::new())
.await
.unwrap();
assert_eq!(out, 42);
assert_eq!(calls.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn with_retry_exhausts_attempts_and_returns_last_error() {
let (flaky, calls) = Flaky::new(10);
let resilient = flaky.with_retry(fast_policy(3));
let err = resilient
.invoke(0, &ExecutionContext::new())
.await
.unwrap_err();
assert!(matches!(
err,
Error::Provider {
kind: entelix_core::ProviderErrorKind::Http(503),
..
}
));
assert_eq!(calls.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn with_retry_does_not_retry_permanent_errors() {
let resilient = AlwaysAuthError.with_retry(fast_policy(5));
let err = resilient
.invoke(0, &ExecutionContext::new())
.await
.unwrap_err();
assert!(matches!(
err,
Error::Provider {
kind: entelix_core::ProviderErrorKind::Http(401),
..
}
));
}
#[tokio::test]
async fn with_fallbacks_uses_secondary_when_primary_is_transient() {
let (primary, primary_calls) = Flaky::new(10);
let (secondary, secondary_calls) = Flaky::new(0);
let resilient = primary.with_fallbacks(vec![secondary]);
let out = resilient.invoke(7, &ExecutionContext::new()).await.unwrap();
assert_eq!(out, 14);
assert_eq!(primary_calls.load(Ordering::SeqCst), 1);
assert_eq!(secondary_calls.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn with_fallbacks_does_not_consult_secondary_on_permanent_error() {
let (secondary, secondary_calls) = Flaky::new(0);
let resilient = AlwaysAuthError.with_fallbacks(vec![secondary]);
let err = resilient
.invoke(0, &ExecutionContext::new())
.await
.unwrap_err();
assert!(matches!(
err,
Error::Provider {
kind: entelix_core::ProviderErrorKind::Http(401),
..
}
));
assert_eq!(
secondary_calls.load(Ordering::SeqCst),
0,
"secondary must NOT be consulted on permanent errors"
);
}
#[tokio::test]
async fn map_transforms_inner_output() {
let inner = RunnableLambda::new(|n: u32, _ctx| async move { Ok::<_, _>(n + 1) });
let mapped = inner.map(|n: u32| format!("value: {n}"));
let out = mapped.invoke(41, &ExecutionContext::new()).await.unwrap();
assert_eq!(out, "value: 42");
}
#[tokio::test]
async fn with_config_does_not_mutate_caller_context() {
let inner = RunnableLambda::new(|_input: (), ctx: ExecutionContext| async move {
Ok::<_, _>(ctx.tenant_id().to_owned())
});
let configured = inner.with_config(|ctx| {
*ctx = ctx.clone().with_tenant_id(TenantId::new("override-tenant"));
});
let parent = ExecutionContext::new().with_tenant_id(TenantId::new("parent-tenant"));
let observed = configured.invoke((), &parent).await.unwrap();
assert_eq!(observed, "override-tenant");
assert_eq!(parent.tenant_id(), "parent-tenant");
}
#[tokio::test]
async fn with_timeout_returns_deadline_exceeded_when_inner_is_slow() {
let slow = RunnableLambda::new(|_input: (), _ctx| async move {
tokio::time::sleep(Duration::from_mins(1)).await;
Ok::<_, _>(())
});
let bounded = slow.with_timeout(Duration::from_millis(20));
let err = bounded
.invoke((), &ExecutionContext::new())
.await
.unwrap_err();
assert!(matches!(err, Error::DeadlineExceeded));
}
#[tokio::test]
async fn with_timeout_passes_through_when_inner_completes_in_time() {
let fast = RunnableLambda::new(|n: u32, _ctx| async move { Ok::<_, _>(n + 1) });
let bounded = fast.with_timeout(Duration::from_secs(5));
let out = bounded.invoke(41, &ExecutionContext::new()).await.unwrap();
assert_eq!(out, 42);
}
#[tokio::test]
async fn with_timeout_honours_caller_cancellation_over_timeout() {
let slow = RunnableLambda::new(|_input: (), _ctx| async move {
tokio::time::sleep(Duration::from_mins(1)).await;
Ok::<_, _>(())
});
let bounded = slow.with_timeout(Duration::from_mins(1));
let token = entelix_core::cancellation::CancellationToken::new();
let ctx = ExecutionContext::with_cancellation(token.clone());
let handle = tokio::spawn(async move { bounded.invoke((), &ctx).await });
tokio::time::sleep(Duration::from_millis(20)).await;
token.cancel();
let err = handle.await.unwrap().unwrap_err();
assert!(matches!(err, Error::Cancelled));
}