Skip to main content

atomr_remote/transport/
throttle.rs

1//! Rate-limiting / blackholing transport adapter.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use parking_lot::RwLock;
8use tokio::sync::mpsc;
9
10use atomr_core::actor::Address;
11
12use crate::pdu::AkkaPdu;
13
14use super::{InboundFrame, Transport, TransportError};
15
16#[derive(Debug, Clone, Copy)]
17pub enum ThrottleMode {
18    /// Pass through unchanged.
19    Unthrottled,
20    /// Inject this many ms of latency on every PDU.
21    Latency(Duration),
22    /// Drop every PDU silently (used for partition simulation).
23    Blackhole,
24}
25
26pub struct ThrottleTransport {
27    inner: Arc<dyn Transport>,
28    mode: Arc<RwLock<ThrottleMode>>,
29}
30
31impl ThrottleTransport {
32    pub fn new(inner: Arc<dyn Transport>, mode: ThrottleMode) -> Arc<Self> {
33        Arc::new(Self { inner, mode: Arc::new(RwLock::new(mode)) })
34    }
35
36    pub fn set_mode(&self, mode: ThrottleMode) {
37        *self.mode.write() = mode;
38    }
39
40    pub fn mode(&self) -> ThrottleMode {
41        *self.mode.read()
42    }
43}
44
45#[async_trait]
46impl Transport for ThrottleTransport {
47    async fn listen(&self) -> Result<Address, TransportError> {
48        self.inner.listen().await
49    }
50
51    async fn associate(&self, target: &Address) -> Result<(), TransportError> {
52        self.inner.associate(target).await
53    }
54
55    async fn send(&self, target: &Address, pdu: AkkaPdu) -> Result<(), TransportError> {
56        match self.mode() {
57            ThrottleMode::Unthrottled => self.inner.send(target, pdu).await,
58            ThrottleMode::Latency(d) => {
59                tokio::time::sleep(d).await;
60                self.inner.send(target, pdu).await
61            }
62            ThrottleMode::Blackhole => Ok(()),
63        }
64    }
65
66    fn inbound(&self) -> mpsc::UnboundedReceiver<InboundFrame> {
67        self.inner.inbound()
68    }
69
70    async fn disassociate(&self, target: &Address) -> Result<(), TransportError> {
71        self.inner.disassociate(target).await
72    }
73
74    async fn shutdown(&self) -> Result<(), TransportError> {
75        self.inner.shutdown().await
76    }
77}