Skip to main content

sof_tx/
providers.rs

1//! Provider traits and basic in-memory adapters used by the transaction SDK.
2
3use std::{
4    net::SocketAddr,
5    sync::{Arc, RwLock},
6    time::Duration,
7};
8
9use reqwest::redirect::Policy;
10use serde::{Deserialize, Serialize};
11use serde_json::from_slice as json_from_slice;
12use sof_support::time_support::nonzero_duration_or;
13use sof_types::PubkeyBytes;
14
15use crate::submit::SubmitTransportError;
16
17/// Maximum HTTP body size accepted from `getLatestBlockhash` RPC responses.
18const MAX_BLOCKHASH_RPC_RESPONSE_BYTES: usize = 64 * 1024;
19/// Default timeout used for recent-blockhash HTTP requests.
20const DEFAULT_RPC_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
21
22/// One leader/validator target that can receive transactions directly.
23#[derive(Debug, Clone, Eq, PartialEq, Hash)]
24pub struct LeaderTarget {
25    /// Optional validator identity.
26    pub identity: Option<PubkeyBytes>,
27    /// TPU/ingress socket used for direct submit.
28    pub tpu_addr: SocketAddr,
29}
30
31impl LeaderTarget {
32    /// Creates a leader target with optional identity.
33    #[must_use]
34    pub const fn new(identity: Option<PubkeyBytes>, tpu_addr: SocketAddr) -> Self {
35        Self { identity, tpu_addr }
36    }
37}
38
39/// Source of the latest recent blockhash bytes.
40pub trait RecentBlockhashProvider: Send + Sync {
41    /// Returns the newest blockhash bytes when available.
42    fn latest_blockhash(&self) -> Option<[u8; 32]>;
43}
44
45/// RPC-backed blockhash provider settings.
46#[derive(Debug, Clone, Eq, PartialEq)]
47pub struct RpcRecentBlockhashProviderConfig {
48    /// HTTP timeout applied to blockhash requests.
49    pub request_timeout: Duration,
50}
51
52impl Default for RpcRecentBlockhashProviderConfig {
53    fn default() -> Self {
54        Self {
55            request_timeout: DEFAULT_RPC_REQUEST_TIMEOUT,
56        }
57    }
58}
59
60/// Cached recent-blockhash provider sourced from a Solana JSON-RPC endpoint.
61#[derive(Debug, Clone)]
62pub struct RpcRecentBlockhashProvider {
63    /// Latest successfully fetched blockhash cached for synchronous readers.
64    latest: Arc<RwLock<Option<[u8; 32]>>>,
65    /// HTTP client reused across on-demand refreshes.
66    client: reqwest::Client,
67    /// Target JSON-RPC endpoint URL.
68    rpc_url: String,
69}
70
71impl RpcRecentBlockhashProvider {
72    /// Creates a provider backed by one RPC endpoint using on-demand refresh.
73    ///
74    /// # Errors
75    ///
76    /// Returns [`SubmitTransportError`] when the HTTP client cannot be built.
77    pub fn new(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
78        let config = RpcRecentBlockhashProviderConfig::default();
79        Self::with_config(rpc_url, &config)
80    }
81
82    /// Creates a provider with explicit request settings.
83    ///
84    /// # Errors
85    ///
86    /// Returns [`SubmitTransportError`] when the HTTP client cannot be built.
87    pub fn with_config(
88        rpc_url: impl Into<String>,
89        config: &RpcRecentBlockhashProviderConfig,
90    ) -> Result<Self, SubmitTransportError> {
91        let rpc_url = rpc_url.into();
92        let request_timeout =
93            nonzero_duration_or(config.request_timeout, DEFAULT_RPC_REQUEST_TIMEOUT);
94        let client = reqwest::Client::builder()
95            .redirect(Policy::none())
96            .connect_timeout(request_timeout)
97            .timeout(request_timeout)
98            .build()
99            .map_err(|error| SubmitTransportError::Config {
100                message: error.to_string(),
101            })?;
102        let latest = Arc::new(RwLock::new(None));
103        Ok(Self {
104            latest,
105            client,
106            rpc_url,
107        })
108    }
109
110    /// Forces one refresh against the configured RPC endpoint and returns the cached value.
111    ///
112    /// # Errors
113    ///
114    /// Returns [`SubmitTransportError`] if the RPC request fails or the response is invalid.
115    pub async fn refresh(&self) -> Result<[u8; 32], SubmitTransportError> {
116        let blockhash = fetch_latest_blockhash(&self.client, &self.rpc_url).await?;
117        let mut latest = self
118            .latest
119            .write()
120            .unwrap_or_else(|poisoned| poisoned.into_inner());
121        *latest = Some(blockhash);
122        Ok(blockhash)
123    }
124}
125
126impl RecentBlockhashProvider for RpcRecentBlockhashProvider {
127    fn latest_blockhash(&self) -> Option<[u8; 32]> {
128        *self
129            .latest
130            .read()
131            .unwrap_or_else(|poisoned| poisoned.into_inner())
132    }
133}
134
135/// Source of current/next leader targets.
136pub trait LeaderProvider: Send + Sync {
137    /// Returns the currently scheduled leader target.
138    fn current_leader(&self) -> Option<LeaderTarget>;
139
140    /// Returns up to `n` upcoming leader targets.
141    fn next_leaders(&self, n: usize) -> Vec<LeaderTarget>;
142}
143
144/// In-memory blockhash provider for tests and static configurations.
145#[derive(Debug, Clone)]
146pub struct StaticRecentBlockhashProvider {
147    /// Optional static blockhash bytes.
148    value: Option<[u8; 32]>,
149}
150
151impl StaticRecentBlockhashProvider {
152    /// Creates a provider with an optional static blockhash.
153    #[must_use]
154    pub const fn new(value: Option<[u8; 32]>) -> Self {
155        Self { value }
156    }
157}
158
159impl RecentBlockhashProvider for StaticRecentBlockhashProvider {
160    fn latest_blockhash(&self) -> Option<[u8; 32]> {
161        self.value
162    }
163}
164
165/// In-memory leader provider for tests and static configurations.
166#[derive(Debug, Clone, Default)]
167pub struct StaticLeaderProvider {
168    /// Optional current leader.
169    current: Option<LeaderTarget>,
170    /// Ordered next leaders.
171    next: Vec<LeaderTarget>,
172}
173
174impl StaticLeaderProvider {
175    /// Creates a static leader provider.
176    #[must_use]
177    pub const fn new(current: Option<LeaderTarget>, next: Vec<LeaderTarget>) -> Self {
178        Self { current, next }
179    }
180}
181
182impl LeaderProvider for StaticLeaderProvider {
183    fn current_leader(&self) -> Option<LeaderTarget> {
184        self.current.clone()
185    }
186
187    fn next_leaders(&self, n: usize) -> Vec<LeaderTarget> {
188        self.next.iter().take(n).cloned().collect()
189    }
190}
191
192/// Minimal JSON-RPC response envelope for `getLatestBlockhash`.
193#[derive(Debug, Deserialize)]
194struct LatestBlockhashRpcResponse {
195    /// Successful RPC result payload.
196    result: Option<LatestBlockhashResult>,
197    /// Error payload returned by the RPC server.
198    error: Option<JsonRpcError>,
199}
200
201/// Parsed `getLatestBlockhash` result object.
202#[derive(Debug, Deserialize)]
203struct LatestBlockhashResult {
204    /// RPC value payload.
205    value: LatestBlockhashValue,
206}
207
208/// JSON payload that carries one base58-encoded recent blockhash.
209#[derive(Debug, Deserialize)]
210struct LatestBlockhashValue {
211    /// Base58-encoded recent blockhash string.
212    blockhash: String,
213}
214
215/// Minimal JSON-RPC error object.
216#[derive(Debug, Deserialize)]
217struct JsonRpcError {
218    /// Numeric JSON-RPC error code.
219    code: i64,
220    /// Human-readable error message.
221    message: String,
222}
223
224/// JSON-RPC request envelope for `getLatestBlockhash`.
225#[derive(Debug, Serialize)]
226struct LatestBlockhashRequest<'request> {
227    /// JSON-RPC protocol version.
228    jsonrpc: &'request str,
229    /// Caller-chosen request ID.
230    id: u64,
231    /// Requested RPC method.
232    method: &'request str,
233    /// Commitment config array.
234    params: [LatestBlockhashRequestConfig<'request>; 1],
235}
236
237/// Commitment config payload for `getLatestBlockhash`.
238#[derive(Debug, Serialize)]
239struct LatestBlockhashRequestConfig<'request> {
240    /// Commitment level requested from RPC.
241    commitment: &'request str,
242}
243
244/// Fetches the latest recent blockhash from one Solana JSON-RPC endpoint.
245async fn fetch_latest_blockhash(
246    client: &reqwest::Client,
247    rpc_url: &str,
248) -> Result<[u8; 32], SubmitTransportError> {
249    let payload = LatestBlockhashRequest {
250        jsonrpc: "2.0",
251        id: 1,
252        method: "getLatestBlockhash",
253        params: [LatestBlockhashRequestConfig {
254            commitment: "processed",
255        }],
256    };
257    let response = client
258        .post(rpc_url)
259        .json(&payload)
260        .send()
261        .await
262        .map_err(|error| SubmitTransportError::Failure {
263            message: error.to_string(),
264        })?;
265    if response.status().is_redirection() {
266        return Err(SubmitTransportError::Failure {
267            message: format!("unexpected redirect response: {}", response.status()),
268        });
269    }
270    let response = response
271        .error_for_status()
272        .map_err(|error| SubmitTransportError::Failure {
273            message: error.to_string(),
274        })?;
275    let response_body = read_http_response_bytes_bounded(response).await?;
276    let parsed: LatestBlockhashRpcResponse =
277        json_from_slice(&response_body).map_err(|error| SubmitTransportError::Failure {
278            message: error.to_string(),
279        })?;
280    if let Some(result) = parsed.result {
281        return parse_blockhash(&result.value.blockhash);
282    }
283    if let Some(error) = parsed.error {
284        return Err(SubmitTransportError::Failure {
285            message: format!("rpc error {}: {}", error.code, error.message),
286        });
287    }
288    Err(SubmitTransportError::Failure {
289        message: "rpc returned neither result nor error".to_owned(),
290    })
291}
292
293/// Reads one RPC response body while enforcing a fixed maximum byte budget.
294async fn read_http_response_bytes_bounded(
295    mut response: reqwest::Response,
296) -> Result<Vec<u8>, SubmitTransportError> {
297    if response
298        .content_length()
299        .is_some_and(|content_length| content_length > MAX_BLOCKHASH_RPC_RESPONSE_BYTES as u64)
300    {
301        return Err(SubmitTransportError::Failure {
302            message: format!(
303                "response body exceeded max size of {MAX_BLOCKHASH_RPC_RESPONSE_BYTES} bytes"
304            ),
305        });
306    }
307
308    let initial_capacity = response
309        .content_length()
310        .and_then(|content_length| usize::try_from(content_length).ok())
311        .unwrap_or(0)
312        .min(MAX_BLOCKHASH_RPC_RESPONSE_BYTES);
313    let mut body = Vec::with_capacity(initial_capacity);
314    while let Some(chunk) =
315        response
316            .chunk()
317            .await
318            .map_err(|error| SubmitTransportError::Failure {
319                message: error.to_string(),
320            })?
321    {
322        let remaining = MAX_BLOCKHASH_RPC_RESPONSE_BYTES.saturating_sub(body.len());
323        if chunk.len() > remaining {
324            return Err(SubmitTransportError::Failure {
325                message: format!(
326                    "response body exceeded max size of {MAX_BLOCKHASH_RPC_RESPONSE_BYTES} bytes"
327                ),
328            });
329        }
330        body.extend_from_slice(&chunk);
331    }
332    Ok(body)
333}
334
335/// Decodes one base58 blockhash string into the byte format used by `TxBuilder`.
336fn parse_blockhash(blockhash: &str) -> Result<[u8; 32], SubmitTransportError> {
337    let decoded =
338        bs58::decode(blockhash)
339            .into_vec()
340            .map_err(|error| SubmitTransportError::Failure {
341                message: format!("failed to decode recent blockhash: {error}"),
342            })?;
343    let bytes: [u8; 32] = decoded
344        .try_into()
345        .map_err(|_error| SubmitTransportError::Failure {
346            message: "rpc blockhash did not decode to 32 bytes".to_owned(),
347        })?;
348    Ok(bytes)
349}
350
351#[cfg(test)]
352#[allow(clippy::indexing_slicing, clippy::panic)]
353mod tests {
354    use super::*;
355    use tokio::{
356        io::{AsyncReadExt, AsyncWriteExt},
357        net::TcpListener,
358    };
359
360    async fn spawn_http_response_server(response: String) -> String {
361        let listener = TcpListener::bind("127.0.0.1:0").await;
362        assert!(listener.is_ok());
363        let listener = listener.unwrap_or_else(|error| panic!("{error}"));
364        let addr = listener.local_addr();
365        assert!(addr.is_ok());
366        let addr = addr.unwrap_or_else(|error| panic!("{error}"));
367        tokio::spawn(async move {
368            let accepted = listener.accept().await;
369            assert!(accepted.is_ok());
370            let (mut stream, _) = accepted.unwrap_or_else(|error| panic!("{error}"));
371            let mut buffer = [0_u8; 4096];
372            let read = stream.read(&mut buffer).await;
373            assert!(read.is_ok());
374            let write = stream.write_all(response.as_bytes()).await;
375            assert!(write.is_ok());
376        });
377        format!("http://{addr}")
378    }
379
380    #[test]
381    fn rpc_recent_blockhash_provider_accepts_zero_timeout_config() {
382        let provider = RpcRecentBlockhashProvider::with_config(
383            "http://127.0.0.1:8899",
384            &RpcRecentBlockhashProviderConfig {
385                request_timeout: Duration::ZERO,
386            },
387        );
388        assert!(provider.is_ok());
389    }
390
391    #[tokio::test]
392    async fn rpc_recent_blockhash_provider_fetches_initial_value() {
393        let expected = [9_u8; 32];
394        let blockhash = bs58::encode(expected).into_string();
395        let listener = TcpListener::bind("127.0.0.1:0").await;
396        assert!(listener.is_ok());
397        let listener = listener.unwrap_or_else(|error| panic!("{error}"));
398        let addr = listener.local_addr();
399        assert!(addr.is_ok());
400        let addr = addr.unwrap_or_else(|error| panic!("{error}"));
401
402        let server = tokio::spawn(async move {
403            let accepted = listener.accept().await;
404            assert!(accepted.is_ok());
405            let (mut stream, _) = accepted.unwrap_or_else(|error| panic!("{error}"));
406            let mut buffer = [0_u8; 4096];
407            let read = stream.read(&mut buffer).await;
408            assert!(read.is_ok());
409            let request = String::from_utf8_lossy(&buffer[..read.unwrap_or(0)]);
410            assert!(request.contains("getLatestBlockhash"));
411            let body = format!(
412                "{{\"jsonrpc\":\"2.0\",\"result\":{{\"value\":{{\"blockhash\":\"{blockhash}\"}}}},\"id\":1}}"
413            );
414            let response = format!(
415                "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
416                body.len(),
417                body
418            );
419            let write = stream.write_all(response.as_bytes()).await;
420            assert!(write.is_ok());
421        });
422
423        let provider = RpcRecentBlockhashProvider::with_config(
424            format!("http://{addr}"),
425            &RpcRecentBlockhashProviderConfig::default(),
426        );
427        assert!(provider.is_ok());
428        let provider = provider.unwrap_or_else(|error| panic!("{error}"));
429        assert_eq!(provider.latest_blockhash(), None);
430        let refreshed = provider.refresh().await;
431        assert!(refreshed.is_ok());
432        assert_eq!(refreshed.unwrap_or([0_u8; 32]), expected);
433        assert_eq!(provider.latest_blockhash(), Some(expected));
434
435        let joined = server.await;
436        assert!(joined.is_ok());
437    }
438
439    #[tokio::test]
440    async fn rpc_recent_blockhash_provider_rejects_redirects() {
441        let target = spawn_http_response_server(
442            "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: 0\r\nconnection: close\r\n\r\n"
443                .to_owned(),
444        )
445        .await;
446        let endpoint = spawn_http_response_server(format!(
447            "HTTP/1.1 307 Temporary Redirect\r\nlocation: {target}\r\ncontent-length: 0\r\nconnection: close\r\n\r\n"
448        ))
449        .await;
450
451        let provider = RpcRecentBlockhashProvider::new(endpoint);
452        assert!(provider.is_ok());
453        let provider = provider.unwrap_or_else(|error| panic!("{error}"));
454        let error = match provider.refresh().await {
455            Ok(_blockhash) => panic!("redirect should fail"),
456            Err(error) => error,
457        };
458        assert!(error.to_string().contains("redirect"));
459    }
460
461    #[tokio::test]
462    async fn rpc_recent_blockhash_provider_rejects_oversized_responses() {
463        let endpoint = spawn_http_response_server(format!(
464            "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n",
465            MAX_BLOCKHASH_RPC_RESPONSE_BYTES.saturating_add(1)
466        ))
467        .await;
468
469        let provider = RpcRecentBlockhashProvider::new(endpoint);
470        assert!(provider.is_ok());
471        let provider = provider.unwrap_or_else(|error| panic!("{error}"));
472        let error = match provider.refresh().await {
473            Ok(_blockhash) => panic!("oversized body should fail"),
474            Err(error) => error,
475        };
476        assert!(error.to_string().contains("exceeded max size"));
477    }
478}