Skip to main content

atomr_remote/transport/
failure_injector.rs

1//! Failure-injecting transport adapter.
2//!
3//! Useful in tests for verifying timeout / retry / quarantine paths.
4
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use parking_lot::RwLock;
9use tokio::sync::mpsc;
10
11use atomr_core::actor::Address;
12
13use crate::pdu::AkkaPdu;
14
15use super::{InboundFrame, Transport, TransportError};
16
17#[derive(Debug, Clone)]
18pub enum InjectionMode {
19    /// Pass through unchanged.
20    Pass,
21    /// Drop every nth send (n>=1; 1 drops everything).
22    DropEvery(u32),
23    /// Reply with a fixed [`TransportError`] on every send.
24    Fail(String),
25}
26
27pub struct FailureInjectorTransport {
28    inner: Arc<dyn Transport>,
29    mode: Arc<RwLock<InjectionMode>>,
30    counter: Arc<std::sync::atomic::AtomicU32>,
31}
32
33impl FailureInjectorTransport {
34    pub fn new(inner: Arc<dyn Transport>, mode: InjectionMode) -> Arc<Self> {
35        Arc::new(Self {
36            inner,
37            mode: Arc::new(RwLock::new(mode)),
38            counter: Arc::new(std::sync::atomic::AtomicU32::new(0)),
39        })
40    }
41
42    pub fn set_mode(&self, mode: InjectionMode) {
43        *self.mode.write() = mode;
44    }
45}
46
47#[async_trait]
48impl Transport for FailureInjectorTransport {
49    async fn listen(&self) -> Result<Address, TransportError> {
50        self.inner.listen().await
51    }
52
53    async fn associate(&self, target: &Address) -> Result<(), TransportError> {
54        self.inner.associate(target).await
55    }
56
57    async fn send(&self, target: &Address, pdu: AkkaPdu) -> Result<(), TransportError> {
58        let mode = self.mode.read().clone();
59        match mode {
60            InjectionMode::Pass => self.inner.send(target, pdu).await,
61            InjectionMode::DropEvery(n) if n >= 1 => {
62                let i = self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
63                if i % n == 0 {
64                    Ok(())
65                } else {
66                    self.inner.send(target, pdu).await
67                }
68            }
69            InjectionMode::DropEvery(_) => self.inner.send(target, pdu).await,
70            InjectionMode::Fail(msg) => Err(TransportError::Other(msg)),
71        }
72    }
73
74    fn inbound(&self) -> mpsc::UnboundedReceiver<InboundFrame> {
75        self.inner.inbound()
76    }
77
78    async fn disassociate(&self, target: &Address) -> Result<(), TransportError> {
79        self.inner.disassociate(target).await
80    }
81
82    async fn shutdown(&self) -> Result<(), TransportError> {
83        self.inner.shutdown().await
84    }
85}