Skip to main content

atomr_remote/transport/
failure_injector.rs

1//! Failure-injecting transport adapter.
2//! akka.net: `Remote/Transport/FailureInjectorTransportAdapter.cs`.
3//!
4//! Useful in tests for verifying timeout / retry / quarantine paths.
5
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use parking_lot::RwLock;
10use tokio::sync::mpsc;
11
12use atomr_core::actor::Address;
13
14use crate::pdu::AkkaPdu;
15
16use super::{InboundFrame, Transport, TransportError};
17
18#[derive(Debug, Clone)]
19pub enum InjectionMode {
20    /// Pass through unchanged.
21    Pass,
22    /// Drop every nth send (n>=1; 1 drops everything).
23    DropEvery(u32),
24    /// Reply with a fixed [`TransportError`] on every send.
25    Fail(String),
26}
27
28pub struct FailureInjectorTransport {
29    inner: Arc<dyn Transport>,
30    mode: Arc<RwLock<InjectionMode>>,
31    counter: Arc<std::sync::atomic::AtomicU32>,
32}
33
34impl FailureInjectorTransport {
35    pub fn new(inner: Arc<dyn Transport>, mode: InjectionMode) -> Arc<Self> {
36        Arc::new(Self {
37            inner,
38            mode: Arc::new(RwLock::new(mode)),
39            counter: Arc::new(std::sync::atomic::AtomicU32::new(0)),
40        })
41    }
42
43    pub fn set_mode(&self, mode: InjectionMode) {
44        *self.mode.write() = mode;
45    }
46}
47
48#[async_trait]
49impl Transport for FailureInjectorTransport {
50    async fn listen(&self) -> Result<Address, TransportError> {
51        self.inner.listen().await
52    }
53
54    async fn associate(&self, target: &Address) -> Result<(), TransportError> {
55        self.inner.associate(target).await
56    }
57
58    async fn send(&self, target: &Address, pdu: AkkaPdu) -> Result<(), TransportError> {
59        let mode = self.mode.read().clone();
60        match mode {
61            InjectionMode::Pass => self.inner.send(target, pdu).await,
62            InjectionMode::DropEvery(n) if n >= 1 => {
63                let i = self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
64                if i % n == 0 {
65                    Ok(())
66                } else {
67                    self.inner.send(target, pdu).await
68                }
69            }
70            InjectionMode::DropEvery(_) => self.inner.send(target, pdu).await,
71            InjectionMode::Fail(msg) => Err(TransportError::Other(msg)),
72        }
73    }
74
75    fn inbound(&self) -> mpsc::UnboundedReceiver<InboundFrame> {
76        self.inner.inbound()
77    }
78
79    async fn disassociate(&self, target: &Address) -> Result<(), TransportError> {
80        self.inner.disassociate(target).await
81    }
82
83    async fn shutdown(&self) -> Result<(), TransportError> {
84        self.inner.shutdown().await
85    }
86}