1use std::{
4 net::SocketAddr,
5 sync::{Arc, RwLock},
6 time::Duration,
7};
8
9use serde::{Deserialize, Serialize};
10use sof_types::PubkeyBytes;
11
12use crate::submit::SubmitTransportError;
13
14#[derive(Debug, Clone, Eq, PartialEq, Hash)]
16pub struct LeaderTarget {
17 pub identity: Option<PubkeyBytes>,
19 pub tpu_addr: SocketAddr,
21}
22
23impl LeaderTarget {
24 #[must_use]
26 pub const fn new(identity: Option<PubkeyBytes>, tpu_addr: SocketAddr) -> Self {
27 Self { identity, tpu_addr }
28 }
29}
30
31pub trait RecentBlockhashProvider: Send + Sync {
33 fn latest_blockhash(&self) -> Option<[u8; 32]>;
35}
36
37#[derive(Debug, Clone, Eq, PartialEq)]
39pub struct RpcRecentBlockhashProviderConfig {
40 pub request_timeout: Duration,
42}
43
44impl Default for RpcRecentBlockhashProviderConfig {
45 fn default() -> Self {
46 Self {
47 request_timeout: Duration::from_secs(10),
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
54pub struct RpcRecentBlockhashProvider {
55 latest: Arc<RwLock<Option<[u8; 32]>>>,
57 client: reqwest::Client,
59 rpc_url: String,
61}
62
63impl RpcRecentBlockhashProvider {
64 pub fn new(rpc_url: impl Into<String>) -> Result<Self, SubmitTransportError> {
70 let config = RpcRecentBlockhashProviderConfig::default();
71 Self::with_config(rpc_url, &config)
72 }
73
74 pub fn with_config(
80 rpc_url: impl Into<String>,
81 config: &RpcRecentBlockhashProviderConfig,
82 ) -> Result<Self, SubmitTransportError> {
83 let rpc_url = rpc_url.into();
84 let client = reqwest::Client::builder()
85 .timeout(config.request_timeout)
86 .build()
87 .map_err(|error| SubmitTransportError::Config {
88 message: error.to_string(),
89 })?;
90 let latest = Arc::new(RwLock::new(None));
91 Ok(Self {
92 latest,
93 client,
94 rpc_url,
95 })
96 }
97
98 pub async fn refresh(&self) -> Result<[u8; 32], SubmitTransportError> {
104 let blockhash = fetch_latest_blockhash(&self.client, &self.rpc_url).await?;
105 let mut latest = self
106 .latest
107 .write()
108 .unwrap_or_else(|poisoned| poisoned.into_inner());
109 *latest = Some(blockhash);
110 Ok(blockhash)
111 }
112}
113
114impl RecentBlockhashProvider for RpcRecentBlockhashProvider {
115 fn latest_blockhash(&self) -> Option<[u8; 32]> {
116 *self
117 .latest
118 .read()
119 .unwrap_or_else(|poisoned| poisoned.into_inner())
120 }
121}
122
123pub trait LeaderProvider: Send + Sync {
125 fn current_leader(&self) -> Option<LeaderTarget>;
127
128 fn next_leaders(&self, n: usize) -> Vec<LeaderTarget>;
130}
131
132#[derive(Debug, Clone)]
134pub struct StaticRecentBlockhashProvider {
135 value: Option<[u8; 32]>,
137}
138
139impl StaticRecentBlockhashProvider {
140 #[must_use]
142 pub const fn new(value: Option<[u8; 32]>) -> Self {
143 Self { value }
144 }
145}
146
147impl RecentBlockhashProvider for StaticRecentBlockhashProvider {
148 fn latest_blockhash(&self) -> Option<[u8; 32]> {
149 self.value
150 }
151}
152
153#[derive(Debug, Clone, Default)]
155pub struct StaticLeaderProvider {
156 current: Option<LeaderTarget>,
158 next: Vec<LeaderTarget>,
160}
161
162impl StaticLeaderProvider {
163 #[must_use]
165 pub const fn new(current: Option<LeaderTarget>, next: Vec<LeaderTarget>) -> Self {
166 Self { current, next }
167 }
168}
169
170impl LeaderProvider for StaticLeaderProvider {
171 fn current_leader(&self) -> Option<LeaderTarget> {
172 self.current.clone()
173 }
174
175 fn next_leaders(&self, n: usize) -> Vec<LeaderTarget> {
176 self.next.iter().take(n).cloned().collect()
177 }
178}
179
180#[derive(Debug, Deserialize)]
182struct LatestBlockhashRpcResponse {
183 result: Option<LatestBlockhashResult>,
185 error: Option<JsonRpcError>,
187}
188
189#[derive(Debug, Deserialize)]
191struct LatestBlockhashResult {
192 value: LatestBlockhashValue,
194}
195
196#[derive(Debug, Deserialize)]
198struct LatestBlockhashValue {
199 blockhash: String,
201}
202
203#[derive(Debug, Deserialize)]
205struct JsonRpcError {
206 code: i64,
208 message: String,
210}
211
212#[derive(Debug, Serialize)]
214struct LatestBlockhashRequest<'request> {
215 jsonrpc: &'request str,
217 id: u64,
219 method: &'request str,
221 params: [LatestBlockhashRequestConfig<'request>; 1],
223}
224
225#[derive(Debug, Serialize)]
227struct LatestBlockhashRequestConfig<'request> {
228 commitment: &'request str,
230}
231
232async fn fetch_latest_blockhash(
234 client: &reqwest::Client,
235 rpc_url: &str,
236) -> Result<[u8; 32], SubmitTransportError> {
237 let payload = LatestBlockhashRequest {
238 jsonrpc: "2.0",
239 id: 1,
240 method: "getLatestBlockhash",
241 params: [LatestBlockhashRequestConfig {
242 commitment: "processed",
243 }],
244 };
245 let response = client
246 .post(rpc_url)
247 .json(&payload)
248 .send()
249 .await
250 .map_err(|error| SubmitTransportError::Failure {
251 message: error.to_string(),
252 })?;
253 let response = response
254 .error_for_status()
255 .map_err(|error| SubmitTransportError::Failure {
256 message: error.to_string(),
257 })?;
258 let parsed: LatestBlockhashRpcResponse =
259 response
260 .json()
261 .await
262 .map_err(|error| SubmitTransportError::Failure {
263 message: error.to_string(),
264 })?;
265 if let Some(result) = parsed.result {
266 return parse_blockhash(&result.value.blockhash);
267 }
268 if let Some(error) = parsed.error {
269 return Err(SubmitTransportError::Failure {
270 message: format!("rpc error {}: {}", error.code, error.message),
271 });
272 }
273 Err(SubmitTransportError::Failure {
274 message: "rpc returned neither result nor error".to_owned(),
275 })
276}
277
278fn parse_blockhash(blockhash: &str) -> Result<[u8; 32], SubmitTransportError> {
280 let decoded =
281 bs58::decode(blockhash)
282 .into_vec()
283 .map_err(|error| SubmitTransportError::Failure {
284 message: format!("failed to decode recent blockhash: {error}"),
285 })?;
286 let bytes: [u8; 32] = decoded
287 .try_into()
288 .map_err(|_error| SubmitTransportError::Failure {
289 message: "rpc blockhash did not decode to 32 bytes".to_owned(),
290 })?;
291 Ok(bytes)
292}
293
294#[cfg(test)]
295#[allow(clippy::indexing_slicing, clippy::panic)]
296mod tests {
297 use super::*;
298 use tokio::{
299 io::{AsyncReadExt, AsyncWriteExt},
300 net::TcpListener,
301 };
302
303 #[tokio::test]
304 async fn rpc_recent_blockhash_provider_fetches_initial_value() {
305 let expected = [9_u8; 32];
306 let blockhash = bs58::encode(expected).into_string();
307 let listener = TcpListener::bind("127.0.0.1:0").await;
308 assert!(listener.is_ok());
309 let listener = listener.unwrap_or_else(|error| panic!("{error}"));
310 let addr = listener.local_addr();
311 assert!(addr.is_ok());
312 let addr = addr.unwrap_or_else(|error| panic!("{error}"));
313
314 let server = tokio::spawn(async move {
315 let accepted = listener.accept().await;
316 assert!(accepted.is_ok());
317 let (mut stream, _) = accepted.unwrap_or_else(|error| panic!("{error}"));
318 let mut buffer = [0_u8; 4096];
319 let read = stream.read(&mut buffer).await;
320 assert!(read.is_ok());
321 let request = String::from_utf8_lossy(&buffer[..read.unwrap_or(0)]);
322 assert!(request.contains("getLatestBlockhash"));
323 let body = format!(
324 "{{\"jsonrpc\":\"2.0\",\"result\":{{\"value\":{{\"blockhash\":\"{blockhash}\"}}}},\"id\":1}}"
325 );
326 let response = format!(
327 "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
328 body.len(),
329 body
330 );
331 let write = stream.write_all(response.as_bytes()).await;
332 assert!(write.is_ok());
333 });
334
335 let provider = RpcRecentBlockhashProvider::with_config(
336 format!("http://{addr}"),
337 &RpcRecentBlockhashProviderConfig::default(),
338 );
339 assert!(provider.is_ok());
340 let provider = provider.unwrap_or_else(|error| panic!("{error}"));
341 assert_eq!(provider.latest_blockhash(), None);
342 let refreshed = provider.refresh().await;
343 assert!(refreshed.is_ok());
344 assert_eq!(refreshed.unwrap_or([0_u8; 32]), expected);
345 assert_eq!(provider.latest_blockhash(), Some(expected));
346
347 let joined = server.await;
348 assert!(joined.is_ok());
349 }
350}