atomr_remote/transport/
throttle.rs1use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use parking_lot::RwLock;
9use tokio::sync::mpsc;
10
11use atomr_core::actor::Address;
12
13use crate::pdu::AkkaPdu;
14
15use super::{InboundFrame, Transport, TransportError};
16
17#[derive(Debug, Clone, Copy)]
18pub enum ThrottleMode {
19 Unthrottled,
21 Latency(Duration),
23 Blackhole,
25}
26
27pub struct ThrottleTransport {
28 inner: Arc<dyn Transport>,
29 mode: Arc<RwLock<ThrottleMode>>,
30}
31
32impl ThrottleTransport {
33 pub fn new(inner: Arc<dyn Transport>, mode: ThrottleMode) -> Arc<Self> {
34 Arc::new(Self { inner, mode: Arc::new(RwLock::new(mode)) })
35 }
36
37 pub fn set_mode(&self, mode: ThrottleMode) {
38 *self.mode.write() = mode;
39 }
40
41 pub fn mode(&self) -> ThrottleMode {
42 *self.mode.read()
43 }
44}
45
46#[async_trait]
47impl Transport for ThrottleTransport {
48 async fn listen(&self) -> Result<Address, TransportError> {
49 self.inner.listen().await
50 }
51
52 async fn associate(&self, target: &Address) -> Result<(), TransportError> {
53 self.inner.associate(target).await
54 }
55
56 async fn send(&self, target: &Address, pdu: AkkaPdu) -> Result<(), TransportError> {
57 match self.mode() {
58 ThrottleMode::Unthrottled => self.inner.send(target, pdu).await,
59 ThrottleMode::Latency(d) => {
60 tokio::time::sleep(d).await;
61 self.inner.send(target, pdu).await
62 }
63 ThrottleMode::Blackhole => Ok(()),
64 }
65 }
66
67 fn inbound(&self) -> mpsc::UnboundedReceiver<InboundFrame> {
68 self.inner.inbound()
69 }
70
71 async fn disassociate(&self, target: &Address) -> Result<(), TransportError> {
72 self.inner.disassociate(target).await
73 }
74
75 async fn shutdown(&self) -> Result<(), TransportError> {
76 self.inner.shutdown().await
77 }
78}