atomr_remote/transport/
throttle.rs1use std::sync::Arc;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use parking_lot::RwLock;
8use tokio::sync::mpsc;
9
10use atomr_core::actor::Address;
11
12use crate::pdu::AkkaPdu;
13
14use super::{InboundFrame, Transport, TransportError};
15
16#[derive(Debug, Clone, Copy)]
17pub enum ThrottleMode {
18 Unthrottled,
20 Latency(Duration),
22 Blackhole,
24}
25
26pub struct ThrottleTransport {
27 inner: Arc<dyn Transport>,
28 mode: Arc<RwLock<ThrottleMode>>,
29}
30
31impl ThrottleTransport {
32 pub fn new(inner: Arc<dyn Transport>, mode: ThrottleMode) -> Arc<Self> {
33 Arc::new(Self { inner, mode: Arc::new(RwLock::new(mode)) })
34 }
35
36 pub fn set_mode(&self, mode: ThrottleMode) {
37 *self.mode.write() = mode;
38 }
39
40 pub fn mode(&self) -> ThrottleMode {
41 *self.mode.read()
42 }
43}
44
45#[async_trait]
46impl Transport for ThrottleTransport {
47 async fn listen(&self) -> Result<Address, TransportError> {
48 self.inner.listen().await
49 }
50
51 async fn associate(&self, target: &Address) -> Result<(), TransportError> {
52 self.inner.associate(target).await
53 }
54
55 async fn send(&self, target: &Address, pdu: AkkaPdu) -> Result<(), TransportError> {
56 match self.mode() {
57 ThrottleMode::Unthrottled => self.inner.send(target, pdu).await,
58 ThrottleMode::Latency(d) => {
59 tokio::time::sleep(d).await;
60 self.inner.send(target, pdu).await
61 }
62 ThrottleMode::Blackhole => Ok(()),
63 }
64 }
65
66 fn inbound(&self) -> mpsc::UnboundedReceiver<InboundFrame> {
67 self.inner.inbound()
68 }
69
70 async fn disassociate(&self, target: &Address) -> Result<(), TransportError> {
71 self.inner.disassociate(target).await
72 }
73
74 async fn shutdown(&self) -> Result<(), TransportError> {
75 self.inner.shutdown().await
76 }
77}