1use std::{
4 net::SocketAddr,
5 sync::{Arc, RwLock},
6 time::Duration,
7};
8
9use reqwest::redirect::Policy;
10use serde::{Deserialize, Serialize};
11use serde_json::from_slice as json_from_slice;
12use sof_support::time_support::nonzero_duration_or;
13use sof_types::PubkeyBytes;
14
15use crate::submit::SubmitTransportError;
16
17const MAX_BLOCKHASH_RPC_RESPONSE_BYTES: usize = 64 * 1024;
19const DEFAULT_RPC_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
21
22#[derive(Debug, Clone, Eq, PartialEq, Hash)]
24pub struct LeaderTarget {
25 pub identity: Option<PubkeyBytes>,
27 pub tpu_addr: SocketAddr,
29}
30
31impl LeaderTarget {
32 #[must_use]
34 pub const fn new(identity: Option<PubkeyBytes>, tpu_addr: SocketAddr) -> Self {
35 Self { identity, tpu_addr }
36 }
37}
38
39pub trait RecentBlockhashProvider: Send + Sync {
41 fn latest_blockhash(&self) -> Option<[u8; 32]>;
43}
44
45#[derive(Debug, Clone, Eq, PartialEq)]
47pub struct RpcRecentBlockhashProviderConfig {
48 pub request_timeout: Duration,
50}
51
52impl Default for RpcRecentBlockhashProviderConfig {
53 fn default() -> Self {
54 Self {
55 request_timeout: DEFAULT_RPC_REQUEST_TIMEOUT,
56 }
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct RpcRecentBlockhashProvider {
63 latest: Arc<RwLock<Option<[u8; 32]>>>,
65 client: reqwest::Client,
67 rpc_url: String,
69}
70
71impl RpcRecentBlockhashProvider {
72 pub fn new(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
78 let config = RpcRecentBlockhashProviderConfig::default();
79 Self::with_config(rpc_url, &config)
80 }
81
82 pub fn with_config(
88 rpc_url: impl Into<String>,
89 config: &RpcRecentBlockhashProviderConfig,
90 ) -> Result<Self, SubmitTransportError> {
91 let rpc_url = rpc_url.into();
92 let request_timeout =
93 nonzero_duration_or(config.request_timeout, DEFAULT_RPC_REQUEST_TIMEOUT);
94 let client = reqwest::Client::builder()
95 .redirect(Policy::none())
96 .connect_timeout(request_timeout)
97 .timeout(request_timeout)
98 .build()
99 .map_err(|error| SubmitTransportError::Config {
100 message: error.to_string(),
101 })?;
102 let latest = Arc::new(RwLock::new(None));
103 Ok(Self {
104 latest,
105 client,
106 rpc_url,
107 })
108 }
109
110 pub async fn refresh(&self) -> Result<[u8; 32], SubmitTransportError> {
116 let blockhash = fetch_latest_blockhash(&self.client, &self.rpc_url).await?;
117 let mut latest = self
118 .latest
119 .write()
120 .unwrap_or_else(|poisoned| poisoned.into_inner());
121 *latest = Some(blockhash);
122 Ok(blockhash)
123 }
124}
125
126impl RecentBlockhashProvider for RpcRecentBlockhashProvider {
127 fn latest_blockhash(&self) -> Option<[u8; 32]> {
128 *self
129 .latest
130 .read()
131 .unwrap_or_else(|poisoned| poisoned.into_inner())
132 }
133}
134
135pub trait LeaderProvider: Send + Sync {
137 fn current_leader(&self) -> Option<LeaderTarget>;
139
140 fn next_leaders(&self, n: usize) -> Vec<LeaderTarget>;
142}
143
144#[derive(Debug, Clone)]
146pub struct StaticRecentBlockhashProvider {
147 value: Option<[u8; 32]>,
149}
150
151impl StaticRecentBlockhashProvider {
152 #[must_use]
154 pub const fn new(value: Option<[u8; 32]>) -> Self {
155 Self { value }
156 }
157}
158
159impl RecentBlockhashProvider for StaticRecentBlockhashProvider {
160 fn latest_blockhash(&self) -> Option<[u8; 32]> {
161 self.value
162 }
163}
164
165#[derive(Debug, Clone, Default)]
167pub struct StaticLeaderProvider {
168 current: Option<LeaderTarget>,
170 next: Vec<LeaderTarget>,
172}
173
174impl StaticLeaderProvider {
175 #[must_use]
177 pub const fn new(current: Option<LeaderTarget>, next: Vec<LeaderTarget>) -> Self {
178 Self { current, next }
179 }
180}
181
182impl LeaderProvider for StaticLeaderProvider {
183 fn current_leader(&self) -> Option<LeaderTarget> {
184 self.current.clone()
185 }
186
187 fn next_leaders(&self, n: usize) -> Vec<LeaderTarget> {
188 self.next.iter().take(n).cloned().collect()
189 }
190}
191
192#[derive(Debug, Deserialize)]
194struct LatestBlockhashRpcResponse {
195 result: Option<LatestBlockhashResult>,
197 error: Option<JsonRpcError>,
199}
200
201#[derive(Debug, Deserialize)]
203struct LatestBlockhashResult {
204 value: LatestBlockhashValue,
206}
207
208#[derive(Debug, Deserialize)]
210struct LatestBlockhashValue {
211 blockhash: String,
213}
214
215#[derive(Debug, Deserialize)]
217struct JsonRpcError {
218 code: i64,
220 message: String,
222}
223
224#[derive(Debug, Serialize)]
226struct LatestBlockhashRequest<'request> {
227 jsonrpc: &'request str,
229 id: u64,
231 method: &'request str,
233 params: [LatestBlockhashRequestConfig<'request>; 1],
235}
236
237#[derive(Debug, Serialize)]
239struct LatestBlockhashRequestConfig<'request> {
240 commitment: &'request str,
242}
243
244async fn fetch_latest_blockhash(
246 client: &reqwest::Client,
247 rpc_url: &str,
248) -> Result<[u8; 32], SubmitTransportError> {
249 let payload = LatestBlockhashRequest {
250 jsonrpc: "2.0",
251 id: 1,
252 method: "getLatestBlockhash",
253 params: [LatestBlockhashRequestConfig {
254 commitment: "processed",
255 }],
256 };
257 let response = client
258 .post(rpc_url)
259 .json(&payload)
260 .send()
261 .await
262 .map_err(|error| SubmitTransportError::Failure {
263 message: error.to_string(),
264 })?;
265 if response.status().is_redirection() {
266 return Err(SubmitTransportError::Failure {
267 message: format!("unexpected redirect response: {}", response.status()),
268 });
269 }
270 let response = response
271 .error_for_status()
272 .map_err(|error| SubmitTransportError::Failure {
273 message: error.to_string(),
274 })?;
275 let response_body = read_http_response_bytes_bounded(response).await?;
276 let parsed: LatestBlockhashRpcResponse =
277 json_from_slice(&response_body).map_err(|error| SubmitTransportError::Failure {
278 message: error.to_string(),
279 })?;
280 if let Some(result) = parsed.result {
281 return parse_blockhash(&result.value.blockhash);
282 }
283 if let Some(error) = parsed.error {
284 return Err(SubmitTransportError::Failure {
285 message: format!("rpc error {}: {}", error.code, error.message),
286 });
287 }
288 Err(SubmitTransportError::Failure {
289 message: "rpc returned neither result nor error".to_owned(),
290 })
291}
292
293async fn read_http_response_bytes_bounded(
295 mut response: reqwest::Response,
296) -> Result<Vec<u8>, SubmitTransportError> {
297 if response
298 .content_length()
299 .is_some_and(|content_length| content_length > MAX_BLOCKHASH_RPC_RESPONSE_BYTES as u64)
300 {
301 return Err(SubmitTransportError::Failure {
302 message: format!(
303 "response body exceeded max size of {MAX_BLOCKHASH_RPC_RESPONSE_BYTES} bytes"
304 ),
305 });
306 }
307
308 let initial_capacity = response
309 .content_length()
310 .and_then(|content_length| usize::try_from(content_length).ok())
311 .unwrap_or(0)
312 .min(MAX_BLOCKHASH_RPC_RESPONSE_BYTES);
313 let mut body = Vec::with_capacity(initial_capacity);
314 while let Some(chunk) =
315 response
316 .chunk()
317 .await
318 .map_err(|error| SubmitTransportError::Failure {
319 message: error.to_string(),
320 })?
321 {
322 let remaining = MAX_BLOCKHASH_RPC_RESPONSE_BYTES.saturating_sub(body.len());
323 if chunk.len() > remaining {
324 return Err(SubmitTransportError::Failure {
325 message: format!(
326 "response body exceeded max size of {MAX_BLOCKHASH_RPC_RESPONSE_BYTES} bytes"
327 ),
328 });
329 }
330 body.extend_from_slice(&chunk);
331 }
332 Ok(body)
333}
334
335fn parse_blockhash(blockhash: &str) -> Result<[u8; 32], SubmitTransportError> {
337 let decoded =
338 bs58::decode(blockhash)
339 .into_vec()
340 .map_err(|error| SubmitTransportError::Failure {
341 message: format!("failed to decode recent blockhash: {error}"),
342 })?;
343 let bytes: [u8; 32] = decoded
344 .try_into()
345 .map_err(|_error| SubmitTransportError::Failure {
346 message: "rpc blockhash did not decode to 32 bytes".to_owned(),
347 })?;
348 Ok(bytes)
349}
350
351#[cfg(test)]
352#[allow(clippy::indexing_slicing, clippy::panic)]
353mod tests {
354 use super::*;
355 use tokio::{
356 io::{AsyncReadExt, AsyncWriteExt},
357 net::TcpListener,
358 };
359
360 async fn spawn_http_response_server(response: String) -> String {
361 let listener = TcpListener::bind("127.0.0.1:0").await;
362 assert!(listener.is_ok());
363 let listener = listener.unwrap_or_else(|error| panic!("{error}"));
364 let addr = listener.local_addr();
365 assert!(addr.is_ok());
366 let addr = addr.unwrap_or_else(|error| panic!("{error}"));
367 tokio::spawn(async move {
368 let accepted = listener.accept().await;
369 assert!(accepted.is_ok());
370 let (mut stream, _) = accepted.unwrap_or_else(|error| panic!("{error}"));
371 let mut buffer = [0_u8; 4096];
372 let read = stream.read(&mut buffer).await;
373 assert!(read.is_ok());
374 let write = stream.write_all(response.as_bytes()).await;
375 assert!(write.is_ok());
376 });
377 format!("http://{addr}")
378 }
379
380 #[test]
381 fn rpc_recent_blockhash_provider_accepts_zero_timeout_config() {
382 let provider = RpcRecentBlockhashProvider::with_config(
383 "http://127.0.0.1:8899",
384 &RpcRecentBlockhashProviderConfig {
385 request_timeout: Duration::ZERO,
386 },
387 );
388 assert!(provider.is_ok());
389 }
390
391 #[tokio::test]
392 async fn rpc_recent_blockhash_provider_fetches_initial_value() {
393 let expected = [9_u8; 32];
394 let blockhash = bs58::encode(expected).into_string();
395 let listener = TcpListener::bind("127.0.0.1:0").await;
396 assert!(listener.is_ok());
397 let listener = listener.unwrap_or_else(|error| panic!("{error}"));
398 let addr = listener.local_addr();
399 assert!(addr.is_ok());
400 let addr = addr.unwrap_or_else(|error| panic!("{error}"));
401
402 let server = tokio::spawn(async move {
403 let accepted = listener.accept().await;
404 assert!(accepted.is_ok());
405 let (mut stream, _) = accepted.unwrap_or_else(|error| panic!("{error}"));
406 let mut buffer = [0_u8; 4096];
407 let read = stream.read(&mut buffer).await;
408 assert!(read.is_ok());
409 let request = String::from_utf8_lossy(&buffer[..read.unwrap_or(0)]);
410 assert!(request.contains("getLatestBlockhash"));
411 let body = format!(
412 "{{\"jsonrpc\":\"2.0\",\"result\":{{\"value\":{{\"blockhash\":\"{blockhash}\"}}}},\"id\":1}}"
413 );
414 let response = format!(
415 "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
416 body.len(),
417 body
418 );
419 let write = stream.write_all(response.as_bytes()).await;
420 assert!(write.is_ok());
421 });
422
423 let provider = RpcRecentBlockhashProvider::with_config(
424 format!("http://{addr}"),
425 &RpcRecentBlockhashProviderConfig::default(),
426 );
427 assert!(provider.is_ok());
428 let provider = provider.unwrap_or_else(|error| panic!("{error}"));
429 assert_eq!(provider.latest_blockhash(), None);
430 let refreshed = provider.refresh().await;
431 assert!(refreshed.is_ok());
432 assert_eq!(refreshed.unwrap_or([0_u8; 32]), expected);
433 assert_eq!(provider.latest_blockhash(), Some(expected));
434
435 let joined = server.await;
436 assert!(joined.is_ok());
437 }
438
439 #[tokio::test]
440 async fn rpc_recent_blockhash_provider_rejects_redirects() {
441 let target = spawn_http_response_server(
442 "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: 0\r\nconnection: close\r\n\r\n"
443 .to_owned(),
444 )
445 .await;
446 let endpoint = spawn_http_response_server(format!(
447 "HTTP/1.1 307 Temporary Redirect\r\nlocation: {target}\r\ncontent-length: 0\r\nconnection: close\r\n\r\n"
448 ))
449 .await;
450
451 let provider = RpcRecentBlockhashProvider::new(endpoint);
452 assert!(provider.is_ok());
453 let provider = provider.unwrap_or_else(|error| panic!("{error}"));
454 let error = match provider.refresh().await {
455 Ok(_blockhash) => panic!("redirect should fail"),
456 Err(error) => error,
457 };
458 assert!(error.to_string().contains("redirect"));
459 }
460
461 #[tokio::test]
462 async fn rpc_recent_blockhash_provider_rejects_oversized_responses() {
463 let endpoint = spawn_http_response_server(format!(
464 "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n",
465 MAX_BLOCKHASH_RPC_RESPONSE_BYTES.saturating_add(1)
466 ))
467 .await;
468
469 let provider = RpcRecentBlockhashProvider::new(endpoint);
470 assert!(provider.is_ok());
471 let provider = provider.unwrap_or_else(|error| panic!("{error}"));
472 let error = match provider.refresh().await {
473 Ok(_blockhash) => panic!("oversized body should fail"),
474 Err(error) => error,
475 };
476 assert!(error.to_string().contains("exceeded max size"));
477 }
478}