chainrpc_core/
backpressure.rs1use std::sync::Arc;
7
8use async_trait::async_trait;
9use tokio::sync::Semaphore;
10
11use crate::error::TransportError;
12use crate::request::{JsonRpcRequest, JsonRpcResponse};
13use crate::transport::{HealthStatus, RpcTransport};
14
15#[derive(Debug, Clone)]
17pub struct BackpressureConfig {
18 pub max_in_flight: usize,
20}
21
22impl Default for BackpressureConfig {
23 fn default() -> Self {
24 Self {
25 max_in_flight: 1000,
26 }
27 }
28}
29
30pub struct BackpressureTransport {
35 inner: Arc<dyn RpcTransport>,
36 semaphore: Semaphore,
37 max_in_flight: usize,
38}
39
40impl BackpressureTransport {
41 pub fn new(inner: Arc<dyn RpcTransport>, config: BackpressureConfig) -> Self {
43 Self {
44 inner,
45 semaphore: Semaphore::new(config.max_in_flight),
46 max_in_flight: config.max_in_flight,
47 }
48 }
49
50 pub fn in_flight(&self) -> usize {
52 self.max_in_flight - self.semaphore.available_permits()
53 }
54
55 pub fn is_full(&self) -> bool {
57 self.semaphore.available_permits() == 0
58 }
59}
60
61#[async_trait]
62impl RpcTransport for BackpressureTransport {
63 async fn send(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
64 let permit = self
65 .semaphore
66 .try_acquire()
67 .map_err(|_| TransportError::Overloaded {
68 queue_depth: self.max_in_flight,
69 })?;
70
71 let result = self.inner.send(req).await;
72 drop(permit);
73 result
74 }
75
76 fn health(&self) -> HealthStatus {
77 self.inner.health()
78 }
79
80 fn url(&self) -> &str {
81 self.inner.url()
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use crate::request::RpcId;
89
90 struct SlowTransport;
91
92 #[async_trait]
93 impl RpcTransport for SlowTransport {
94 async fn send(&self, _req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
95 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
96 Ok(JsonRpcResponse {
97 jsonrpc: "2.0".into(),
98 id: RpcId::Number(1),
99 result: Some(serde_json::json!("0x1")),
100 error: None,
101 })
102 }
103 fn url(&self) -> &str {
104 "mock://slow"
105 }
106 }
107
108 struct InstantTransport;
109
110 #[async_trait]
111 impl RpcTransport for InstantTransport {
112 async fn send(&self, _req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
113 Ok(JsonRpcResponse {
114 jsonrpc: "2.0".into(),
115 id: RpcId::Number(1),
116 result: Some(serde_json::json!("0x1")),
117 error: None,
118 })
119 }
120 fn url(&self) -> &str {
121 "mock://instant"
122 }
123 }
124
125 #[tokio::test]
126 async fn allows_requests_under_limit() {
127 let transport = BackpressureTransport::new(
128 Arc::new(InstantTransport),
129 BackpressureConfig { max_in_flight: 10 },
130 );
131
132 let req = JsonRpcRequest::auto("eth_blockNumber", vec![]);
133 let result = transport.send(req).await;
134 assert!(result.is_ok());
135 }
136
137 #[tokio::test]
138 async fn rejects_when_full() {
139 let transport = Arc::new(BackpressureTransport::new(
140 Arc::new(SlowTransport),
141 BackpressureConfig { max_in_flight: 2 },
142 ));
143
144 let t1 = transport.clone();
146 let t2 = transport.clone();
147 let _h1 = tokio::spawn(async move {
148 let req = JsonRpcRequest::auto("eth_blockNumber", vec![]);
149 let _ = t1.send(req).await;
150 });
151 let _h2 = tokio::spawn(async move {
152 let req = JsonRpcRequest::auto("eth_blockNumber", vec![]);
153 let _ = t2.send(req).await;
154 });
155
156 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
158
159 let req = JsonRpcRequest::auto("eth_blockNumber", vec![]);
161 let result = transport.send(req).await;
162 assert!(matches!(result, Err(TransportError::Overloaded { .. })));
163 }
164
165 #[tokio::test]
166 async fn in_flight_tracking() {
167 let transport = BackpressureTransport::new(
168 Arc::new(InstantTransport),
169 BackpressureConfig { max_in_flight: 100 },
170 );
171
172 assert_eq!(transport.in_flight(), 0);
173 assert!(!transport.is_full());
174 }
175}