1use std::time::Duration;
4
5use async_trait::async_trait;
6use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD};
7use reqwest::redirect::Policy;
8use serde::{Deserialize, Serialize};
9use serde_json::from_slice as json_from_slice;
10
11use super::{RpcSubmitConfig, RpcSubmitTransport, SubmitTransportError};
12
13const MAX_RPC_SUBMIT_RESPONSE_BYTES: usize = 64 * 1024;
15
16#[derive(Debug, Clone)]
18pub struct JsonRpcTransport {
19 client: reqwest::Client,
21 rpc_url: String,
23}
24
25impl JsonRpcTransport {
26 pub fn new(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
32 let client = reqwest::Client::builder()
33 .redirect(Policy::none())
34 .connect_timeout(Duration::from_secs(10))
35 .timeout(Duration::from_secs(10))
36 .build()
37 .map_err(|error| SubmitTransportError::Config {
38 message: error.to_string(),
39 })?;
40 Ok(Self {
41 client,
42 rpc_url: rpc_url.into(),
43 })
44 }
45}
46
47#[derive(Debug, Deserialize)]
49struct JsonRpcResponse {
50 result: Option<String>,
52 error: Option<JsonRpcError>,
54}
55
56#[derive(Debug, Deserialize)]
58struct JsonRpcError {
59 code: i64,
61 message: String,
63}
64
65#[async_trait]
66impl RpcSubmitTransport for JsonRpcTransport {
67 async fn submit_rpc(
68 &self,
69 tx_bytes: &[u8],
70 config: &RpcSubmitConfig,
71 ) -> Result<String, SubmitTransportError> {
72 #[derive(Debug, Serialize)]
73 struct RpcConfig<'config> {
74 encoding: &'config str,
76 #[serde(rename = "skipPreflight")]
78 skip_preflight: bool,
79 #[serde(
81 rename = "preflightCommitment",
82 skip_serializing_if = "Option::is_none"
83 )]
84 preflight_commitment: Option<&'config str>,
85 }
86
87 let encoded_tx = BASE64_STANDARD.encode(tx_bytes);
88 let payload = serde_json::json!({
89 "jsonrpc": "2.0",
90 "id": 1,
91 "method": "sendTransaction",
92 "params": [
93 encoded_tx,
94 RpcConfig {
95 encoding: "base64",
96 skip_preflight: config.skip_preflight,
97 preflight_commitment: config.preflight_commitment.as_deref(),
98 }
99 ]
100 });
101
102 let response = self
103 .client
104 .post(&self.rpc_url)
105 .json(&payload)
106 .send()
107 .await
108 .map_err(|error| SubmitTransportError::Failure {
109 message: error.to_string(),
110 })?;
111 if response.status().is_redirection() {
112 return Err(SubmitTransportError::Failure {
113 message: format!("unexpected redirect response: {}", response.status()),
114 });
115 }
116
117 let response =
118 response
119 .error_for_status()
120 .map_err(|error| SubmitTransportError::Failure {
121 message: error.to_string(),
122 })?;
123
124 let response_body = read_http_response_bytes_bounded(response).await?;
125 let parsed: JsonRpcResponse =
126 json_from_slice(&response_body).map_err(|error| SubmitTransportError::Failure {
127 message: error.to_string(),
128 })?;
129
130 if let Some(signature) = parsed.result {
131 return Ok(signature);
132 }
133 if let Some(error) = parsed.error {
134 return Err(SubmitTransportError::Failure {
135 message: format!("rpc error {}: {}", error.code, error.message),
136 });
137 }
138
139 Err(SubmitTransportError::Failure {
140 message: "rpc returned neither result nor error".to_owned(),
141 })
142 }
143}
144
145async fn read_http_response_bytes_bounded(
147 mut response: reqwest::Response,
148) -> Result<Vec<u8>, SubmitTransportError> {
149 if response
150 .content_length()
151 .is_some_and(|content_length| content_length > MAX_RPC_SUBMIT_RESPONSE_BYTES as u64)
152 {
153 return Err(SubmitTransportError::Failure {
154 message: format!(
155 "response body exceeded max size of {MAX_RPC_SUBMIT_RESPONSE_BYTES} bytes"
156 ),
157 });
158 }
159
160 let initial_capacity = response
161 .content_length()
162 .and_then(|content_length| usize::try_from(content_length).ok())
163 .unwrap_or(0)
164 .min(MAX_RPC_SUBMIT_RESPONSE_BYTES);
165 let mut body = Vec::with_capacity(initial_capacity);
166 while let Some(chunk) =
167 response
168 .chunk()
169 .await
170 .map_err(|error| SubmitTransportError::Failure {
171 message: error.to_string(),
172 })?
173 {
174 let remaining = MAX_RPC_SUBMIT_RESPONSE_BYTES.saturating_sub(body.len());
175 if chunk.len() > remaining {
176 return Err(SubmitTransportError::Failure {
177 message: format!(
178 "response body exceeded max size of {MAX_RPC_SUBMIT_RESPONSE_BYTES} bytes"
179 ),
180 });
181 }
182 body.extend_from_slice(&chunk);
183 }
184 Ok(body)
185}
186
187#[cfg(test)]
188#[allow(clippy::indexing_slicing, clippy::panic)]
189mod tests {
190 use super::*;
191 use tokio::{
192 io::{AsyncReadExt, AsyncWriteExt},
193 net::TcpListener,
194 };
195
196 async fn spawn_http_response_server(response: String) -> String {
197 let listener = TcpListener::bind("127.0.0.1:0").await;
198 assert!(listener.is_ok());
199 let listener = listener.unwrap_or_else(|error| panic!("{error}"));
200 let addr = listener.local_addr();
201 assert!(addr.is_ok());
202 let addr = addr.unwrap_or_else(|error| panic!("{error}"));
203 tokio::spawn(async move {
204 let accepted = listener.accept().await;
205 assert!(accepted.is_ok());
206 let (mut stream, _) = accepted.unwrap_or_else(|error| panic!("{error}"));
207 let mut buffer = [0_u8; 4096];
208 let read = stream.read(&mut buffer).await;
209 assert!(read.is_ok());
210 let write = stream.write_all(response.as_bytes()).await;
211 assert!(write.is_ok());
212 });
213 format!("http://{addr}")
214 }
215
216 #[tokio::test]
217 async fn json_rpc_transport_rejects_redirects() {
218 let endpoint = spawn_http_response_server(
219 "HTTP/1.1 307 Temporary Redirect\r\nlocation: http://127.0.0.1/\r\ncontent-length: 0\r\nconnection: close\r\n\r\n"
220 .to_owned(),
221 )
222 .await;
223 let transport = JsonRpcTransport::new(endpoint);
224 assert!(transport.is_ok());
225 let transport = transport.unwrap_or_else(|error| panic!("{error}"));
226
227 let error = transport
228 .submit_rpc(&[1, 2, 3], &RpcSubmitConfig::default())
229 .await;
230 assert!(error.is_err());
231 let error = match error {
232 Ok(_signature) => panic!("redirect should fail"),
233 Err(error) => error,
234 };
235 assert!(error.to_string().contains("redirect"));
236 }
237
238 #[tokio::test]
239 async fn json_rpc_transport_rejects_oversized_responses() {
240 let endpoint = spawn_http_response_server(format!(
241 "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n",
242 MAX_RPC_SUBMIT_RESPONSE_BYTES.saturating_add(1)
243 ))
244 .await;
245 let transport = JsonRpcTransport::new(endpoint);
246 assert!(transport.is_ok());
247 let transport = transport.unwrap_or_else(|error| panic!("{error}"));
248
249 let error = transport
250 .submit_rpc(&[1, 2, 3], &RpcSubmitConfig::default())
251 .await;
252 assert!(error.is_err());
253 let error = match error {
254 Ok(_signature) => panic!("oversized body should fail"),
255 Err(error) => error,
256 };
257 assert!(error.to_string().contains("exceeded max size"));
258 }
259}