Skip to main content

atomr_remote/transport/
throttle.rs

1//! Rate-limiting / blackholing transport adapter.
2//! akka.net: `Remote/Transport/ThrottleTransportAdapter.cs`.
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use parking_lot::RwLock;
9use tokio::sync::mpsc;
10
11use atomr_core::actor::Address;
12
13use crate::pdu::AkkaPdu;
14
15use super::{InboundFrame, Transport, TransportError};
16
17#[derive(Debug, Clone, Copy)]
18pub enum ThrottleMode {
19    /// Pass through unchanged.
20    Unthrottled,
21    /// Inject this many ms of latency on every PDU.
22    Latency(Duration),
23    /// Drop every PDU silently (used for partition simulation).
24    Blackhole,
25}
26
27pub struct ThrottleTransport {
28    inner: Arc<dyn Transport>,
29    mode: Arc<RwLock<ThrottleMode>>,
30}
31
32impl ThrottleTransport {
33    pub fn new(inner: Arc<dyn Transport>, mode: ThrottleMode) -> Arc<Self> {
34        Arc::new(Self { inner, mode: Arc::new(RwLock::new(mode)) })
35    }
36
37    pub fn set_mode(&self, mode: ThrottleMode) {
38        *self.mode.write() = mode;
39    }
40
41    pub fn mode(&self) -> ThrottleMode {
42        *self.mode.read()
43    }
44}
45
46#[async_trait]
47impl Transport for ThrottleTransport {
48    async fn listen(&self) -> Result<Address, TransportError> {
49        self.inner.listen().await
50    }
51
52    async fn associate(&self, target: &Address) -> Result<(), TransportError> {
53        self.inner.associate(target).await
54    }
55
56    async fn send(&self, target: &Address, pdu: AkkaPdu) -> Result<(), TransportError> {
57        match self.mode() {
58            ThrottleMode::Unthrottled => self.inner.send(target, pdu).await,
59            ThrottleMode::Latency(d) => {
60                tokio::time::sleep(d).await;
61                self.inner.send(target, pdu).await
62            }
63            ThrottleMode::Blackhole => Ok(()),
64        }
65    }
66
67    fn inbound(&self) -> mpsc::UnboundedReceiver<InboundFrame> {
68        self.inner.inbound()
69    }
70
71    async fn disassociate(&self, target: &Address) -> Result<(), TransportError> {
72        self.inner.disassociate(target).await
73    }
74
75    async fn shutdown(&self) -> Result<(), TransportError> {
76        self.inner.shutdown().await
77    }
78}