atomr_remote/transport/
failure_injector.rs1use std::sync::Arc;
6
7use async_trait::async_trait;
8use parking_lot::RwLock;
9use tokio::sync::mpsc;
10
11use atomr_core::actor::Address;
12
13use crate::pdu::AkkaPdu;
14
15use super::{InboundFrame, Transport, TransportError};
16
17#[derive(Debug, Clone)]
18pub enum InjectionMode {
19 Pass,
21 DropEvery(u32),
23 Fail(String),
25}
26
27pub struct FailureInjectorTransport {
28 inner: Arc<dyn Transport>,
29 mode: Arc<RwLock<InjectionMode>>,
30 counter: Arc<std::sync::atomic::AtomicU32>,
31}
32
33impl FailureInjectorTransport {
34 pub fn new(inner: Arc<dyn Transport>, mode: InjectionMode) -> Arc<Self> {
35 Arc::new(Self {
36 inner,
37 mode: Arc::new(RwLock::new(mode)),
38 counter: Arc::new(std::sync::atomic::AtomicU32::new(0)),
39 })
40 }
41
42 pub fn set_mode(&self, mode: InjectionMode) {
43 *self.mode.write() = mode;
44 }
45}
46
47#[async_trait]
48impl Transport for FailureInjectorTransport {
49 async fn listen(&self) -> Result<Address, TransportError> {
50 self.inner.listen().await
51 }
52
53 async fn associate(&self, target: &Address) -> Result<(), TransportError> {
54 self.inner.associate(target).await
55 }
56
57 async fn send(&self, target: &Address, pdu: AkkaPdu) -> Result<(), TransportError> {
58 let mode = self.mode.read().clone();
59 match mode {
60 InjectionMode::Pass => self.inner.send(target, pdu).await,
61 InjectionMode::DropEvery(n) if n >= 1 => {
62 let i = self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
63 if i % n == 0 {
64 Ok(())
65 } else {
66 self.inner.send(target, pdu).await
67 }
68 }
69 InjectionMode::DropEvery(_) => self.inner.send(target, pdu).await,
70 InjectionMode::Fail(msg) => Err(TransportError::Other(msg)),
71 }
72 }
73
74 fn inbound(&self) -> mpsc::UnboundedReceiver<InboundFrame> {
75 self.inner.inbound()
76 }
77
78 async fn disassociate(&self, target: &Address) -> Result<(), TransportError> {
79 self.inner.disassociate(target).await
80 }
81
82 async fn shutdown(&self) -> Result<(), TransportError> {
83 self.inner.shutdown().await
84 }
85}