telltale-runtime 17.0.0

Choreographic programming for Telltale - effect-based distributed protocols
Documentation
// Fault injection middleware for testing
//
// Allows injecting failures and delays for chaos engineering and testing.

#[cfg(feature = "test-utils")]
use async_trait::async_trait;
#[cfg(feature = "test-utils")]
use cfg_if::cfg_if;
#[cfg(feature = "test-utils")]
use serde::{de::DeserializeOwned, Serialize};
#[cfg(feature = "test-utils")]
use std::time::Duration;

#[cfg(feature = "test-utils")]
use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};

/// Fault injection middleware for testing
#[cfg(feature = "test-utils")]
pub struct FaultInjection<H> {
    inner: H,
    failure_rate: f32,
    delay_range: Option<(Duration, Duration)>,
    rng: rand::rngs::StdRng,
}

#[cfg(feature = "test-utils")]
impl<H> FaultInjection<H> {
    pub fn new(inner: H, failure_rate: f32) -> Self {
        use rand::SeedableRng;
        Self {
            inner,
            failure_rate,
            delay_range: None,
            rng: rand::rngs::StdRng::from_entropy(),
        }
    }

    pub fn with_seed(inner: H, failure_rate: f32, seed: u64) -> Self {
        use rand::SeedableRng;
        Self {
            inner,
            failure_rate,
            delay_range: None,
            rng: rand::rngs::StdRng::seed_from_u64(seed),
        }
    }

    pub fn with_delays(mut self, min: Duration, max: Duration) -> Self {
        self.delay_range = Some((min, max));
        self
    }
}

#[cfg(feature = "test-utils")]
#[async_trait]
impl<H: ChoreoHandler + Send> ChoreoHandler for FaultInjection<H> {
    type Role = H::Role;
    type Endpoint = H::Endpoint;

    async fn send<M: Serialize + Send + Sync>(
        &mut self,
        ep: &mut Self::Endpoint,
        to: Self::Role,
        msg: &M,
    ) -> ChoreoResult<()> {
        use rand::Rng;

        // Inject random delay
        if let Some((min, max)) = self.delay_range {
            let delay_ms = self.rng.gen_range(min.as_millis()..=max.as_millis());
            let delay = Duration::from_millis(u64::try_from(delay_ms).unwrap_or(u64::MAX));

            cfg_if! {
                if #[cfg(target_arch = "wasm32")] {
                    wasm_timer::Delay::new(delay).await.ok();
                } else {
                    tokio::time::sleep(delay).await;
                }
            }
        }

        // Inject random failure
        if self.rng.gen::<f32>() < self.failure_rate {
            return Err(crate::effects::ChoreographyError::Transport(
                "Injected fault".into(),
            ));
        }

        self.inner.send(ep, to, msg).await
    }

    async fn recv<M: DeserializeOwned + Send>(
        &mut self,
        ep: &mut Self::Endpoint,
        from: Self::Role,
    ) -> ChoreoResult<M> {
        self.inner.recv(ep, from).await
    }

    async fn choose(
        &mut self,
        ep: &mut Self::Endpoint,
        who: Self::Role,
        label: <Self::Role as RoleId>::Label,
    ) -> ChoreoResult<()> {
        self.inner.choose(ep, who, label).await
    }

    async fn offer(
        &mut self,
        ep: &mut Self::Endpoint,
        from: Self::Role,
    ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
        self.inner.offer(ep, from).await
    }

    async fn with_timeout<F, T>(
        &mut self,
        ep: &mut Self::Endpoint,
        at: Self::Role,
        dur: Duration,
        body: F,
    ) -> ChoreoResult<T>
    where
        F: std::future::Future<Output = ChoreoResult<T>> + Send,
    {
        self.inner.with_timeout(ep, at, dur, body).await
    }
}