entelix-runnable 0.5.5

entelix composition contract — Runnable<I, O> trait, LCEL pipe, Sequence/Parallel/Router/Lambda
Documentation
//! `RunnableExt` composition adapters — `with_retry`,
//! `with_fallbacks`, `map`, `with_config`, `with_timeout`.

#![allow(clippy::unwrap_used, unreachable_pub)]

use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;

use async_trait::async_trait;
use entelix_core::error::Error;
use entelix_core::transports::RetryPolicy;
use entelix_core::{ExecutionContext, Result, TenantId};
use entelix_runnable::{Runnable, RunnableExt, RunnableLambda};

// `ExponentialBackoff` lives in entelix_core::backoff; the marker
// above is a deliberate compile-time forcing-function that fails if
// someone accidentally re-imports backoff through transports.
#[allow(dead_code)]
mod _shim {
    pub use entelix_core::backoff::ExponentialBackoff;
}

/// Runnable that fails N times then succeeds, recording each call.
struct Flaky {
    fails_remaining: AtomicU32,
    calls: Arc<AtomicU32>,
}

impl Flaky {
    fn new(fail_count: u32) -> (Self, Arc<AtomicU32>) {
        let calls = Arc::new(AtomicU32::new(0));
        (
            Self {
                fails_remaining: AtomicU32::new(fail_count),
                calls: calls.clone(),
            },
            calls,
        )
    }
}

#[async_trait]
impl Runnable<u32, u32> for Flaky {
    async fn invoke(&self, input: u32, _ctx: &ExecutionContext) -> Result<u32> {
        self.calls.fetch_add(1, Ordering::SeqCst);
        let remaining = self.fails_remaining.load(Ordering::SeqCst);
        if remaining > 0 {
            self.fails_remaining.fetch_sub(1, Ordering::SeqCst);
            return Err(Error::provider_http(503, "transient"));
        }
        Ok(input * 2)
    }
}

/// Runnable that always returns a permanent error.
struct AlwaysAuthError;

#[async_trait]
impl Runnable<u32, u32> for AlwaysAuthError {
    async fn invoke(&self, _input: u32, _ctx: &ExecutionContext) -> Result<u32> {
        Err(Error::provider_http(401, "unauthorised"))
    }
}

fn fast_policy(max_attempts: u32) -> RetryPolicy {
    RetryPolicy::standard()
        .with_max_attempts(max_attempts)
        .with_backoff(_shim::ExponentialBackoff::new(
            Duration::from_millis(1),
            Duration::from_millis(2),
        ))
}

#[tokio::test]
async fn with_retry_succeeds_after_transient_failures() {
    let (flaky, calls) = Flaky::new(2);
    let resilient = flaky.with_retry(fast_policy(5));
    let out = resilient
        .invoke(21, &ExecutionContext::new())
        .await
        .unwrap();
    assert_eq!(out, 42);
    // 1 initial + 2 retries.
    assert_eq!(calls.load(Ordering::SeqCst), 3);
}

#[tokio::test]
async fn with_retry_exhausts_attempts_and_returns_last_error() {
    let (flaky, calls) = Flaky::new(10);
    let resilient = flaky.with_retry(fast_policy(3));
    let err = resilient
        .invoke(0, &ExecutionContext::new())
        .await
        .unwrap_err();
    assert!(matches!(
        err,
        Error::Provider {
            kind: entelix_core::ProviderErrorKind::Http(503),
            ..
        }
    ));
    assert_eq!(calls.load(Ordering::SeqCst), 3);
}

#[tokio::test]
async fn with_retry_does_not_retry_permanent_errors() {
    let resilient = AlwaysAuthError.with_retry(fast_policy(5));
    let err = resilient
        .invoke(0, &ExecutionContext::new())
        .await
        .unwrap_err();
    assert!(matches!(
        err,
        Error::Provider {
            kind: entelix_core::ProviderErrorKind::Http(401),
            ..
        }
    ));
    // No retry on 401 — single call only would be expected, but we
    // can't observe call count here. The outcome itself is the
    // assertion: a 401 surfaces the same as a non-retried call.
}

#[tokio::test]
async fn with_fallbacks_uses_secondary_when_primary_is_transient() {
    let (primary, primary_calls) = Flaky::new(10);
    let (secondary, secondary_calls) = Flaky::new(0);
    let resilient = primary.with_fallbacks(vec![secondary]);
    let out = resilient.invoke(7, &ExecutionContext::new()).await.unwrap();
    assert_eq!(out, 14);
    assert_eq!(primary_calls.load(Ordering::SeqCst), 1);
    assert_eq!(secondary_calls.load(Ordering::SeqCst), 1);
}

#[tokio::test]
async fn with_fallbacks_does_not_consult_secondary_on_permanent_error() {
    let (secondary, secondary_calls) = Flaky::new(0);
    let resilient = AlwaysAuthError.with_fallbacks(vec![secondary]);
    let err = resilient
        .invoke(0, &ExecutionContext::new())
        .await
        .unwrap_err();
    assert!(matches!(
        err,
        Error::Provider {
            kind: entelix_core::ProviderErrorKind::Http(401),
            ..
        }
    ));
    assert_eq!(
        secondary_calls.load(Ordering::SeqCst),
        0,
        "secondary must NOT be consulted on permanent errors"
    );
}

#[tokio::test]
async fn map_transforms_inner_output() {
    let inner = RunnableLambda::new(|n: u32, _ctx| async move { Ok::<_, _>(n + 1) });
    let mapped = inner.map(|n: u32| format!("value: {n}"));
    let out = mapped.invoke(41, &ExecutionContext::new()).await.unwrap();
    assert_eq!(out, "value: 42");
}

#[tokio::test]
async fn with_config_does_not_mutate_caller_context() {
    let inner = RunnableLambda::new(|_input: (), ctx: ExecutionContext| async move {
        Ok::<_, _>(ctx.tenant_id().to_owned())
    });
    let configured = inner.with_config(|ctx| {
        *ctx = ctx.clone().with_tenant_id(TenantId::new("override-tenant"));
    });
    let parent = ExecutionContext::new().with_tenant_id(TenantId::new("parent-tenant"));
    let observed = configured.invoke((), &parent).await.unwrap();
    assert_eq!(observed, "override-tenant");
    // The parent's tenant_id is unchanged.
    assert_eq!(parent.tenant_id(), "parent-tenant");
}

#[tokio::test]
async fn with_timeout_returns_deadline_exceeded_when_inner_is_slow() {
    let slow = RunnableLambda::new(|_input: (), _ctx| async move {
        tokio::time::sleep(Duration::from_mins(1)).await;
        Ok::<_, _>(())
    });
    let bounded = slow.with_timeout(Duration::from_millis(20));
    let err = bounded
        .invoke((), &ExecutionContext::new())
        .await
        .unwrap_err();
    assert!(matches!(err, Error::DeadlineExceeded));
}

#[tokio::test]
async fn with_timeout_passes_through_when_inner_completes_in_time() {
    let fast = RunnableLambda::new(|n: u32, _ctx| async move { Ok::<_, _>(n + 1) });
    let bounded = fast.with_timeout(Duration::from_secs(5));
    let out = bounded.invoke(41, &ExecutionContext::new()).await.unwrap();
    assert_eq!(out, 42);
}

#[tokio::test]
async fn with_timeout_honours_caller_cancellation_over_timeout() {
    let slow = RunnableLambda::new(|_input: (), _ctx| async move {
        tokio::time::sleep(Duration::from_mins(1)).await;
        Ok::<_, _>(())
    });
    let bounded = slow.with_timeout(Duration::from_mins(1));
    let token = entelix_core::cancellation::CancellationToken::new();
    let ctx = ExecutionContext::with_cancellation(token.clone());
    let handle = tokio::spawn(async move { bounded.invoke((), &ctx).await });
    tokio::time::sleep(Duration::from_millis(20)).await;
    token.cancel();
    let err = handle.await.unwrap().unwrap_err();
    assert!(matches!(err, Error::Cancelled));
}