Skip to main content

chainrpc_core/
backpressure.rs

1//! Backpressure transport wrapper — limits concurrent in-flight requests.
2//!
3//! When the queue is full, returns `TransportError::Overloaded` immediately
4//! instead of queueing unboundedly and risking OOM.
5
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use tokio::sync::Semaphore;
10
11use crate::error::TransportError;
12use crate::request::{JsonRpcRequest, JsonRpcResponse};
13use crate::transport::{HealthStatus, RpcTransport};
14
15/// Configuration for backpressure.
16#[derive(Debug, Clone)]
17pub struct BackpressureConfig {
18    /// Maximum number of concurrent in-flight requests.
19    pub max_in_flight: usize,
20}
21
22impl Default for BackpressureConfig {
23    fn default() -> Self {
24        Self {
25            max_in_flight: 1000,
26        }
27    }
28}
29
30/// A transport wrapper that limits concurrent in-flight requests.
31///
32/// If `max_in_flight` requests are already pending, new requests
33/// immediately fail with `TransportError::Overloaded`.
34pub struct BackpressureTransport {
35    inner: Arc<dyn RpcTransport>,
36    semaphore: Semaphore,
37    max_in_flight: usize,
38}
39
40impl BackpressureTransport {
41    /// Create a new backpressure wrapper.
42    pub fn new(inner: Arc<dyn RpcTransport>, config: BackpressureConfig) -> Self {
43        Self {
44            inner,
45            semaphore: Semaphore::new(config.max_in_flight),
46            max_in_flight: config.max_in_flight,
47        }
48    }
49
50    /// Current number of in-flight requests.
51    pub fn in_flight(&self) -> usize {
52        self.max_in_flight - self.semaphore.available_permits()
53    }
54
55    /// Whether the transport is at capacity.
56    pub fn is_full(&self) -> bool {
57        self.semaphore.available_permits() == 0
58    }
59}
60
61#[async_trait]
62impl RpcTransport for BackpressureTransport {
63    async fn send(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
64        let permit = self
65            .semaphore
66            .try_acquire()
67            .map_err(|_| TransportError::Overloaded {
68                queue_depth: self.max_in_flight,
69            })?;
70
71        let result = self.inner.send(req).await;
72        drop(permit);
73        result
74    }
75
76    fn health(&self) -> HealthStatus {
77        self.inner.health()
78    }
79
80    fn url(&self) -> &str {
81        self.inner.url()
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use crate::request::RpcId;
89
90    struct SlowTransport;
91
92    #[async_trait]
93    impl RpcTransport for SlowTransport {
94        async fn send(&self, _req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
95            tokio::time::sleep(std::time::Duration::from_secs(10)).await;
96            Ok(JsonRpcResponse {
97                jsonrpc: "2.0".into(),
98                id: RpcId::Number(1),
99                result: Some(serde_json::json!("0x1")),
100                error: None,
101            })
102        }
103        fn url(&self) -> &str {
104            "mock://slow"
105        }
106    }
107
108    struct InstantTransport;
109
110    #[async_trait]
111    impl RpcTransport for InstantTransport {
112        async fn send(&self, _req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
113            Ok(JsonRpcResponse {
114                jsonrpc: "2.0".into(),
115                id: RpcId::Number(1),
116                result: Some(serde_json::json!("0x1")),
117                error: None,
118            })
119        }
120        fn url(&self) -> &str {
121            "mock://instant"
122        }
123    }
124
125    #[tokio::test]
126    async fn allows_requests_under_limit() {
127        let transport = BackpressureTransport::new(
128            Arc::new(InstantTransport),
129            BackpressureConfig { max_in_flight: 10 },
130        );
131
132        let req = JsonRpcRequest::auto("eth_blockNumber", vec![]);
133        let result = transport.send(req).await;
134        assert!(result.is_ok());
135    }
136
137    #[tokio::test]
138    async fn rejects_when_full() {
139        let transport = Arc::new(BackpressureTransport::new(
140            Arc::new(SlowTransport),
141            BackpressureConfig { max_in_flight: 2 },
142        ));
143
144        // Fill up the slots
145        let t1 = transport.clone();
146        let t2 = transport.clone();
147        let _h1 = tokio::spawn(async move {
148            let req = JsonRpcRequest::auto("eth_blockNumber", vec![]);
149            let _ = t1.send(req).await;
150        });
151        let _h2 = tokio::spawn(async move {
152            let req = JsonRpcRequest::auto("eth_blockNumber", vec![]);
153            let _ = t2.send(req).await;
154        });
155
156        // Give spawned tasks time to acquire permits
157        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
158
159        // Third request should be rejected
160        let req = JsonRpcRequest::auto("eth_blockNumber", vec![]);
161        let result = transport.send(req).await;
162        assert!(matches!(result, Err(TransportError::Overloaded { .. })));
163    }
164
165    #[tokio::test]
166    async fn in_flight_tracking() {
167        let transport = BackpressureTransport::new(
168            Arc::new(InstantTransport),
169            BackpressureConfig { max_in_flight: 100 },
170        );
171
172        assert_eq!(transport.in_flight(), 0);
173        assert!(!transport.is_full());
174    }
175}