Skip to main content

sof_tx/
providers.rs

1//! Provider traits and basic in-memory adapters used by the transaction SDK.
2
3use std::{
4    net::SocketAddr,
5    sync::{Arc, RwLock},
6    time::Duration,
7};
8
9use serde::{Deserialize, Serialize};
10use solana_pubkey::Pubkey;
11
12use crate::submit::SubmitTransportError;
13
14/// One leader/validator target that can receive transactions directly.
15#[derive(Debug, Clone, Eq, PartialEq, Hash)]
16pub struct LeaderTarget {
17    /// Optional validator identity.
18    pub identity: Option<Pubkey>,
19    /// TPU/ingress socket used for direct submit.
20    pub tpu_addr: SocketAddr,
21}
22
23impl LeaderTarget {
24    /// Creates a leader target with optional identity.
25    #[must_use]
26    pub const fn new(identity: Option<Pubkey>, tpu_addr: SocketAddr) -> Self {
27        Self { identity, tpu_addr }
28    }
29}
30
31/// Source of the latest recent blockhash bytes.
32pub trait RecentBlockhashProvider: Send + Sync {
33    /// Returns the newest blockhash bytes when available.
34    fn latest_blockhash(&self) -> Option<[u8; 32]>;
35}
36
37/// RPC-backed blockhash provider settings.
38#[derive(Debug, Clone, Eq, PartialEq)]
39pub struct RpcRecentBlockhashProviderConfig {
40    /// HTTP timeout applied to blockhash requests.
41    pub request_timeout: Duration,
42}
43
44impl Default for RpcRecentBlockhashProviderConfig {
45    fn default() -> Self {
46        Self {
47            request_timeout: Duration::from_secs(10),
48        }
49    }
50}
51
52/// Cached recent-blockhash provider sourced from a Solana JSON-RPC endpoint.
53#[derive(Debug, Clone)]
54pub struct RpcRecentBlockhashProvider {
55    /// Latest successfully fetched blockhash cached for synchronous readers.
56    latest: Arc<RwLock<Option<[u8; 32]>>>,
57    /// HTTP client reused across on-demand refreshes.
58    client: reqwest::Client,
59    /// Target JSON-RPC endpoint URL.
60    rpc_url: String,
61}
62
63impl RpcRecentBlockhashProvider {
64    /// Creates a provider backed by one RPC endpoint using on-demand refresh.
65    ///
66    /// # Errors
67    ///
68    /// Returns [`SubmitTransportError`] when the HTTP client cannot be built.
69    pub fn new(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
70        let config = RpcRecentBlockhashProviderConfig::default();
71        Self::with_config(rpc_url, &config)
72    }
73
74    /// Creates a provider with explicit request settings.
75    ///
76    /// # Errors
77    ///
78    /// Returns [`SubmitTransportError`] when the HTTP client cannot be built.
79    pub fn with_config(
80        rpc_url: impl Into<String>,
81        config: &RpcRecentBlockhashProviderConfig,
82    ) -> Result<Self, SubmitTransportError> {
83        let rpc_url = rpc_url.into();
84        let client = reqwest::Client::builder()
85            .timeout(config.request_timeout)
86            .build()
87            .map_err(|error| SubmitTransportError::Config {
88                message: error.to_string(),
89            })?;
90        let latest = Arc::new(RwLock::new(None));
91        Ok(Self {
92            latest,
93            client,
94            rpc_url,
95        })
96    }
97
98    /// Forces one refresh against the configured RPC endpoint and returns the cached value.
99    ///
100    /// # Errors
101    ///
102    /// Returns [`SubmitTransportError`] if the RPC request fails or the response is invalid.
103    pub async fn refresh(&self) -> Result<[u8; 32], SubmitTransportError> {
104        let blockhash = fetch_latest_blockhash(&self.client, &self.rpc_url).await?;
105        let mut latest = self
106            .latest
107            .write()
108            .unwrap_or_else(|poisoned| poisoned.into_inner());
109        *latest = Some(blockhash);
110        Ok(blockhash)
111    }
112}
113
114impl RecentBlockhashProvider for RpcRecentBlockhashProvider {
115    fn latest_blockhash(&self) -> Option<[u8; 32]> {
116        *self
117            .latest
118            .read()
119            .unwrap_or_else(|poisoned| poisoned.into_inner())
120    }
121}
122
123/// Source of current/next leader targets.
124pub trait LeaderProvider: Send + Sync {
125    /// Returns the currently scheduled leader target.
126    fn current_leader(&self) -> Option<LeaderTarget>;
127
128    /// Returns up to `n` upcoming leader targets.
129    fn next_leaders(&self, n: usize) -> Vec<LeaderTarget>;
130}
131
132/// In-memory blockhash provider for tests and static configurations.
133#[derive(Debug, Clone)]
134pub struct StaticRecentBlockhashProvider {
135    /// Optional static blockhash bytes.
136    value: Option<[u8; 32]>,
137}
138
139impl StaticRecentBlockhashProvider {
140    /// Creates a provider with an optional static blockhash.
141    #[must_use]
142    pub const fn new(value: Option<[u8; 32]>) -> Self {
143        Self { value }
144    }
145}
146
147impl RecentBlockhashProvider for StaticRecentBlockhashProvider {
148    fn latest_blockhash(&self) -> Option<[u8; 32]> {
149        self.value
150    }
151}
152
153/// In-memory leader provider for tests and static configurations.
154#[derive(Debug, Clone, Default)]
155pub struct StaticLeaderProvider {
156    /// Optional current leader.
157    current: Option<LeaderTarget>,
158    /// Ordered next leaders.
159    next: Vec<LeaderTarget>,
160}
161
162impl StaticLeaderProvider {
163    /// Creates a static leader provider.
164    #[must_use]
165    pub const fn new(current: Option<LeaderTarget>, next: Vec<LeaderTarget>) -> Self {
166        Self { current, next }
167    }
168}
169
170impl LeaderProvider for StaticLeaderProvider {
171    fn current_leader(&self) -> Option<LeaderTarget> {
172        self.current.clone()
173    }
174
175    fn next_leaders(&self, n: usize) -> Vec<LeaderTarget> {
176        self.next.iter().take(n).cloned().collect()
177    }
178}
179
180/// Minimal JSON-RPC response envelope for `getLatestBlockhash`.
181#[derive(Debug, Deserialize)]
182struct LatestBlockhashRpcResponse {
183    /// Successful RPC result payload.
184    result: Option<LatestBlockhashResult>,
185    /// Error payload returned by the RPC server.
186    error: Option<JsonRpcError>,
187}
188
189/// Parsed `getLatestBlockhash` result object.
190#[derive(Debug, Deserialize)]
191struct LatestBlockhashResult {
192    /// RPC value payload.
193    value: LatestBlockhashValue,
194}
195
196/// JSON payload that carries one base58-encoded recent blockhash.
197#[derive(Debug, Deserialize)]
198struct LatestBlockhashValue {
199    /// Base58-encoded recent blockhash string.
200    blockhash: String,
201}
202
203/// Minimal JSON-RPC error object.
204#[derive(Debug, Deserialize)]
205struct JsonRpcError {
206    /// Numeric JSON-RPC error code.
207    code: i64,
208    /// Human-readable error message.
209    message: String,
210}
211
212/// JSON-RPC request envelope for `getLatestBlockhash`.
213#[derive(Debug, Serialize)]
214struct LatestBlockhashRequest<'request> {
215    /// JSON-RPC protocol version.
216    jsonrpc: &'request str,
217    /// Caller-chosen request ID.
218    id: u64,
219    /// Requested RPC method.
220    method: &'request str,
221    /// Commitment config array.
222    params: [LatestBlockhashRequestConfig<'request>; 1],
223}
224
225/// Commitment config payload for `getLatestBlockhash`.
226#[derive(Debug, Serialize)]
227struct LatestBlockhashRequestConfig<'request> {
228    /// Commitment level requested from RPC.
229    commitment: &'request str,
230}
231
232/// Fetches the latest recent blockhash from one Solana JSON-RPC endpoint.
233async fn fetch_latest_blockhash(
234    client: &reqwest::Client,
235    rpc_url: &str,
236) -> Result<[u8; 32], SubmitTransportError> {
237    let payload = LatestBlockhashRequest {
238        jsonrpc: "2.0",
239        id: 1,
240        method: "getLatestBlockhash",
241        params: [LatestBlockhashRequestConfig {
242            commitment: "processed",
243        }],
244    };
245    let response = client
246        .post(rpc_url)
247        .json(&payload)
248        .send()
249        .await
250        .map_err(|error| SubmitTransportError::Failure {
251            message: error.to_string(),
252        })?;
253    let response = response
254        .error_for_status()
255        .map_err(|error| SubmitTransportError::Failure {
256            message: error.to_string(),
257        })?;
258    let parsed: LatestBlockhashRpcResponse =
259        response
260            .json()
261            .await
262            .map_err(|error| SubmitTransportError::Failure {
263                message: error.to_string(),
264            })?;
265    if let Some(result) = parsed.result {
266        return parse_blockhash(&result.value.blockhash);
267    }
268    if let Some(error) = parsed.error {
269        return Err(SubmitTransportError::Failure {
270            message: format!("rpc error {}: {}", error.code, error.message),
271        });
272    }
273    Err(SubmitTransportError::Failure {
274        message: "rpc returned neither result nor error".to_owned(),
275    })
276}
277
278/// Decodes one base58 blockhash string into the byte format used by `TxBuilder`.
279fn parse_blockhash(blockhash: &str) -> Result<[u8; 32], SubmitTransportError> {
280    let decoded =
281        bs58::decode(blockhash)
282            .into_vec()
283            .map_err(|error| SubmitTransportError::Failure {
284                message: format!("failed to decode recent blockhash: {error}"),
285            })?;
286    let bytes: [u8; 32] = decoded
287        .try_into()
288        .map_err(|_error| SubmitTransportError::Failure {
289            message: "rpc blockhash did not decode to 32 bytes".to_owned(),
290        })?;
291    Ok(bytes)
292}
293
294#[cfg(test)]
295#[allow(clippy::indexing_slicing, clippy::panic)]
296mod tests {
297    use super::*;
298    use tokio::{
299        io::{AsyncReadExt, AsyncWriteExt},
300        net::TcpListener,
301    };
302
303    #[tokio::test]
304    async fn rpc_recent_blockhash_provider_fetches_initial_value() {
305        let expected = [9_u8; 32];
306        let blockhash = bs58::encode(expected).into_string();
307        let listener = TcpListener::bind("127.0.0.1:0").await;
308        assert!(listener.is_ok());
309        let listener = listener.unwrap_or_else(|error| panic!("{error}"));
310        let addr = listener.local_addr();
311        assert!(addr.is_ok());
312        let addr = addr.unwrap_or_else(|error| panic!("{error}"));
313
314        let server = tokio::spawn(async move {
315            let accepted = listener.accept().await;
316            assert!(accepted.is_ok());
317            let (mut stream, _) = accepted.unwrap_or_else(|error| panic!("{error}"));
318            let mut buffer = [0_u8; 4096];
319            let read = stream.read(&mut buffer).await;
320            assert!(read.is_ok());
321            let request = String::from_utf8_lossy(&buffer[..read.unwrap_or(0)]);
322            assert!(request.contains("getLatestBlockhash"));
323            let body = format!(
324                "{{\"jsonrpc\":\"2.0\",\"result\":{{\"value\":{{\"blockhash\":\"{blockhash}\"}}}},\"id\":1}}"
325            );
326            let response = format!(
327                "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
328                body.len(),
329                body
330            );
331            let write = stream.write_all(response.as_bytes()).await;
332            assert!(write.is_ok());
333        });
334
335        let provider = RpcRecentBlockhashProvider::with_config(
336            format!("http://{addr}"),
337            &RpcRecentBlockhashProviderConfig::default(),
338        );
339        assert!(provider.is_ok());
340        let provider = provider.unwrap_or_else(|error| panic!("{error}"));
341        assert_eq!(provider.latest_blockhash(), None);
342        let refreshed = provider.refresh().await;
343        assert!(refreshed.is_ok());
344        assert_eq!(refreshed.unwrap_or([0_u8; 32]), expected);
345        assert_eq!(provider.latest_blockhash(), Some(expected));
346
347        let joined = server.await;
348        assert!(joined.is_ok());
349    }
350}