Skip to main content

sof_tx/submit/
rpc.rs

1//! JSON-RPC submit transport implementation.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD};
7use reqwest::redirect::Policy;
8use serde::{Deserialize, Serialize};
9use serde_json::from_slice as json_from_slice;
10
11use super::{RpcSubmitConfig, RpcSubmitTransport, SubmitTransportError};
12
13/// Maximum HTTP body size accepted from JSON-RPC submit responses.
14const MAX_RPC_SUBMIT_RESPONSE_BYTES: usize = 64 * 1024;
15
16/// JSON-RPC transport that submits encoded transactions via `sendTransaction`.
17#[derive(Debug, Clone)]
18pub struct JsonRpcTransport {
19    /// HTTP client used for RPC calls.
20    client: reqwest::Client,
21    /// Target JSON-RPC endpoint URL.
22    rpc_url: String,
23}
24
25impl JsonRpcTransport {
26    /// Creates a JSON-RPC transport.
27    ///
28    /// # Errors
29    ///
30    /// Returns [`SubmitTransportError::Config`] when HTTP client creation fails.
31    pub fn new(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
32        let client = reqwest::Client::builder()
33            .redirect(Policy::none())
34            .connect_timeout(Duration::from_secs(10))
35            .timeout(Duration::from_secs(10))
36            .build()
37            .map_err(|error| SubmitTransportError::Config {
38                message: error.to_string(),
39            })?;
40        Ok(Self {
41            client,
42            rpc_url: rpc_url.into(),
43        })
44    }
45}
46
47/// JSON-RPC envelope.
48#[derive(Debug, Deserialize)]
49struct JsonRpcResponse {
50    /// Result value for successful calls.
51    result: Option<String>,
52    /// Error payload for failed calls.
53    error: Option<JsonRpcError>,
54}
55
56/// JSON-RPC error object.
57#[derive(Debug, Deserialize)]
58struct JsonRpcError {
59    /// JSON-RPC error code.
60    code: i64,
61    /// Human-readable message.
62    message: String,
63}
64
65#[async_trait]
66impl RpcSubmitTransport for JsonRpcTransport {
67    async fn submit_rpc(
68        &self,
69        tx_bytes: &[u8],
70        config: &RpcSubmitConfig,
71    ) -> Result<String, SubmitTransportError> {
72        #[derive(Debug, Serialize)]
73        struct RpcConfig<'config> {
74            /// Transaction encoding format.
75            encoding: &'config str,
76            /// Optional preflight skip flag.
77            #[serde(rename = "skipPreflight")]
78            skip_preflight: bool,
79            /// Optional preflight commitment.
80            #[serde(
81                rename = "preflightCommitment",
82                skip_serializing_if = "Option::is_none"
83            )]
84            preflight_commitment: Option<&'config str>,
85        }
86
87        let encoded_tx = BASE64_STANDARD.encode(tx_bytes);
88        let payload = serde_json::json!({
89            "jsonrpc": "2.0",
90            "id": 1,
91            "method": "sendTransaction",
92            "params": [
93                encoded_tx,
94                RpcConfig {
95                    encoding: "base64",
96                    skip_preflight: config.skip_preflight,
97                    preflight_commitment: config.preflight_commitment.as_deref(),
98                }
99            ]
100        });
101
102        let response = self
103            .client
104            .post(&self.rpc_url)
105            .json(&payload)
106            .send()
107            .await
108            .map_err(|error| SubmitTransportError::Failure {
109                message: error.to_string(),
110            })?;
111        if response.status().is_redirection() {
112            return Err(SubmitTransportError::Failure {
113                message: format!("unexpected redirect response: {}", response.status()),
114            });
115        }
116
117        let response =
118            response
119                .error_for_status()
120                .map_err(|error| SubmitTransportError::Failure {
121                    message: error.to_string(),
122                })?;
123
124        let response_body = read_http_response_bytes_bounded(response).await?;
125        let parsed: JsonRpcResponse =
126            json_from_slice(&response_body).map_err(|error| SubmitTransportError::Failure {
127                message: error.to_string(),
128            })?;
129
130        if let Some(signature) = parsed.result {
131            return Ok(signature);
132        }
133        if let Some(error) = parsed.error {
134            return Err(SubmitTransportError::Failure {
135                message: format!("rpc error {}: {}", error.code, error.message),
136            });
137        }
138
139        Err(SubmitTransportError::Failure {
140            message: "rpc returned neither result nor error".to_owned(),
141        })
142    }
143}
144
145/// Reads one submit response body while enforcing a fixed maximum byte budget.
146async fn read_http_response_bytes_bounded(
147    mut response: reqwest::Response,
148) -> Result<Vec<u8>, SubmitTransportError> {
149    if response
150        .content_length()
151        .is_some_and(|content_length| content_length > MAX_RPC_SUBMIT_RESPONSE_BYTES as u64)
152    {
153        return Err(SubmitTransportError::Failure {
154            message: format!(
155                "response body exceeded max size of {MAX_RPC_SUBMIT_RESPONSE_BYTES} bytes"
156            ),
157        });
158    }
159
160    let initial_capacity = response
161        .content_length()
162        .and_then(|content_length| usize::try_from(content_length).ok())
163        .unwrap_or(0)
164        .min(MAX_RPC_SUBMIT_RESPONSE_BYTES);
165    let mut body = Vec::with_capacity(initial_capacity);
166    while let Some(chunk) =
167        response
168            .chunk()
169            .await
170            .map_err(|error| SubmitTransportError::Failure {
171                message: error.to_string(),
172            })?
173    {
174        let remaining = MAX_RPC_SUBMIT_RESPONSE_BYTES.saturating_sub(body.len());
175        if chunk.len() > remaining {
176            return Err(SubmitTransportError::Failure {
177                message: format!(
178                    "response body exceeded max size of {MAX_RPC_SUBMIT_RESPONSE_BYTES} bytes"
179                ),
180            });
181        }
182        body.extend_from_slice(&chunk);
183    }
184    Ok(body)
185}
186
187#[cfg(test)]
188#[allow(clippy::indexing_slicing, clippy::panic)]
189mod tests {
190    use super::*;
191    use tokio::{
192        io::{AsyncReadExt, AsyncWriteExt},
193        net::TcpListener,
194    };
195
196    async fn spawn_http_response_server(response: String) -> String {
197        let listener = TcpListener::bind("127.0.0.1:0").await;
198        assert!(listener.is_ok());
199        let listener = listener.unwrap_or_else(|error| panic!("{error}"));
200        let addr = listener.local_addr();
201        assert!(addr.is_ok());
202        let addr = addr.unwrap_or_else(|error| panic!("{error}"));
203        tokio::spawn(async move {
204            let accepted = listener.accept().await;
205            assert!(accepted.is_ok());
206            let (mut stream, _) = accepted.unwrap_or_else(|error| panic!("{error}"));
207            let mut buffer = [0_u8; 4096];
208            let read = stream.read(&mut buffer).await;
209            assert!(read.is_ok());
210            let write = stream.write_all(response.as_bytes()).await;
211            assert!(write.is_ok());
212        });
213        format!("http://{addr}")
214    }
215
216    #[tokio::test]
217    async fn json_rpc_transport_rejects_redirects() {
218        let endpoint = spawn_http_response_server(
219            "HTTP/1.1 307 Temporary Redirect\r\nlocation: http://127.0.0.1/\r\ncontent-length: 0\r\nconnection: close\r\n\r\n"
220                .to_owned(),
221        )
222        .await;
223        let transport = JsonRpcTransport::new(endpoint);
224        assert!(transport.is_ok());
225        let transport = transport.unwrap_or_else(|error| panic!("{error}"));
226
227        let error = transport
228            .submit_rpc(&[1, 2, 3], &RpcSubmitConfig::default())
229            .await;
230        assert!(error.is_err());
231        let error = match error {
232            Ok(_signature) => panic!("redirect should fail"),
233            Err(error) => error,
234        };
235        assert!(error.to_string().contains("redirect"));
236    }
237
238    #[tokio::test]
239    async fn json_rpc_transport_rejects_oversized_responses() {
240        let endpoint = spawn_http_response_server(format!(
241            "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n",
242            MAX_RPC_SUBMIT_RESPONSE_BYTES.saturating_add(1)
243        ))
244        .await;
245        let transport = JsonRpcTransport::new(endpoint);
246        assert!(transport.is_ok());
247        let transport = transport.unwrap_or_else(|error| panic!("{error}"));
248
249        let error = transport
250            .submit_rpc(&[1, 2, 3], &RpcSubmitConfig::default())
251            .await;
252        assert!(error.is_err());
253        let error = match error {
254            Ok(_signature) => panic!("oversized body should fail"),
255            Err(error) => error,
256        };
257        assert!(error.to_string().contains("exceeded max size"));
258    }
259}