1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
use crate::errors::{JitoClientError, JitoClientResult};
use crate::grpc::{
bundle::Bundle,
searcher::{searcher_service_client::SearcherServiceClient, SendBundleRequest},
};
use crate::nodes::NodeRegion;
use futures_timer::Delay;
use solana_transaction::versioned::VersionedTransaction;
use std::time::Duration;
use tonic::transport::{channel::ClientTlsConfig, Channel, Endpoint};
pub struct JitoClient {
client: SearcherServiceClient<Channel>,
endpoint: &'static str,
}
impl JitoClient {
/// Creates a new gRPC client that dyanmically determines the fastest endpoint to connect to.
///
/// This method measures latency to all available endpoints and selects the one with the lowest response time for optimal performance.
///
/// # Arguments
/// * `timeout` - Connection and request timeout in seconds. Defaults to 2 seconds if None is passed.
///
/// # Returns
/// Returns the configured client connected to the fastest endpoint, or an error if region measurement or connection fails.
///
/// # Errors
/// This function will return an error if:
/// - Region latency measurement fails
/// - Connection to the selected endpoint fails
///
/// # Examples
/// ```rust
/// //Use default 2-second timeout
/// let client = JitoClient::new_dynamic_region(None).await?;
///
/// // Use custom 5-second timeout
/// let client = JitoClient::new_dynamic_region(Some(5)).await?;
/// ```
pub async fn new_dynamic_region(timeout: Option<u64>) -> JitoClientResult<Self> {
let fastest_endpoint = NodeRegion::measure_latency().await?.0.endpoint();
let timeout_dur = Duration::from_secs(timeout.unwrap_or(2));
let channel = Endpoint::from_static(fastest_endpoint)
.tls_config(ClientTlsConfig::new().with_native_roots())?
.tcp_nodelay(true)
.timeout(timeout_dur)
.connect_timeout(timeout_dur)
.connect()
.await?;
Ok(Self {
client: SearcherServiceClient::new(channel),
endpoint: fastest_endpoint,
})
}
/// Creates a new gRPC client that connects to a specified input endpoint.
///
/// # Arguments
/// * `endpoint` - The gRPC endpoint URL
/// * `timeout` - Connection and request timeout in seconds. Defaults to 2 seconds if None is passed.
///
/// # Returns
/// Returns the configured client connected to the endpoint, or an error if connection fails.
///
/// # Errors
/// This function will return an error if connection to the selected endpoint fails
///
/// # Examples
/// ```rust
/// // Connect with default timeout
/// let client = JitoClient::new("https://ny.mainnet.block-engine.jito.wtf:443", None).await?;
///
/// // Connect with custom 10-second timeout
/// let client = JitoClient::new("https://ny.mainnet.block-engine.jito.wtf:443", Some(10)).await?;
/// ```
pub async fn new(endpoint: &'static str, timeout: Option<u64>) -> JitoClientResult<Self> {
let timeout_dur = Duration::from_secs(timeout.unwrap_or(2));
let channel = Endpoint::from_shared(endpoint)?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.tcp_nodelay(true)
.timeout(timeout_dur)
.connect_timeout(timeout_dur)
.connect()
.await?;
let client = SearcherServiceClient::new(channel);
Ok(Self { client, endpoint })
}
/// Sends a bundle of transactions to the node via gRPC.
///
/// # Arguments
/// * `transactions` - A vec of transactions (`VersionedTransaction`) to be sent
///
/// # Returns
/// Returns a String containing the unique bundle ID.
///
/// # Errors
/// This function will return an error if:
/// - Too many transactions provided
/// - Transaction serialization fails
/// - gRPC connection fails
/// - Node server returns an error
///
/// # Examples
/// ```rust
/// let mut client = JitoClient::new_dynamic_region(None).await?;
///
/// let transactions = vec![];
///
/// match client.send(transactions).await {
/// Ok(uuid) => println!("Bundle ID: {}", uuid),
/// Err(e) => eprintln!("Failed to send: {}", e),
/// }
/// ```
pub async fn send(
&mut self,
transactions: &[VersionedTransaction],
) -> JitoClientResult<String> {
let bundle = Bundle::create(transactions)?;
let request = SendBundleRequest {
bundle: Some(bundle),
};
let response = self.client.send_bundle(request).await?;
Ok(response.into_inner().uuid)
}
/// Sends a bundle of transactions with automatic retries.
///
/// # Arguments
/// * `transactions` - A vec of transactions (`VersionedTransaction`) to be sent
/// * `retry_logic` - Configuration for retry behavior including max attempts and wait times.
///
/// # Returns
/// Returns a String containing the unique bundle ID.
///
/// # Errors
/// This function will return an error if:
/// - Too many transactions provided
/// - Transaction serialization fails
/// - gRPC connection fails
/// - Node server returns an error
/// - Maximum retry attempts exceeded
///
/// # Retry Behavior
/// - Uses random jitter between min_wait and max_wait milliseconds
/// - Logs debug information for each failed attempt
///
/// # Examples
/// ```rust
/// let mut client = JitoClient::new_dynamic_region(None).await?;
/// // 3 retries with default timings
/// let retry_config = RetryLogic::new(3);
///
/// let transactions = vec![];
///
/// match client.send_with_retry(transactions, retry_config).await {
/// Ok(uuid) => println!("Bundle ID: {}", uuid),
/// Err(e) => eprintln!("Failed to send: {}", e),
/// }
/// ```
pub async fn send_with_retry(
&mut self,
transactions: &[VersionedTransaction],
retry_logic: RetryLogic,
) -> JitoClientResult<String> {
let bundle = Bundle::create(transactions)?;
let request = SendBundleRequest {
bundle: Some(bundle),
};
let mut retries = 0u8;
loop {
match self.client.send_bundle(request.clone()).await {
Ok(response) => {
return Ok(response.into_inner().uuid);
}
Err(e) => {
log::debug!("Send error: {e}");
Delay::new(retry_logic.jitter()).await;
retries += 1;
if retries >= retry_logic.max_retries {
return Err(JitoClientError::MaxRetriesError);
}
}
}
}
}
/// Returns the endpoint URL that this client is currently connected to.
pub fn get_endpoint(&self) -> &'static str {
self.endpoint
}
/// Returns all available node regions that can be used for connections.
pub fn all_regions() -> &'static [NodeRegion] {
NodeRegion::all()
}
}
pub struct RetryLogic {
pub max_retries: u8,
pub min_wait: u64,
pub max_wait: u64,
}
impl RetryLogic {
pub fn new(max_retries: u8) -> Self {
Self {
max_retries,
min_wait: 5,
max_wait: 25,
}
}
pub fn new_with_wait_bounds(
max_retries: u8,
min_wait: u64,
max_wait: u64,
) -> JitoClientResult<Self> {
if min_wait >= max_wait {
return Err(JitoClientError::WaitParameterError);
}
Ok(Self {
max_retries,
min_wait,
max_wait,
})
}
pub fn jitter(&self) -> std::time::Duration {
std::time::Duration::from_millis(rand::random_range(self.min_wait..=self.max_wait))
}
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
use solana_keypair::{Keypair, Signer};
use solana_program::{
hash::Hash,
instruction::{AccountMeta, Instruction},
pubkey::Pubkey,
};
use solana_system_interface::instruction::transfer;
use solana_transaction::{Message, VersionedMessage};
use std::str::FromStr;
const SERVER_URL1: &str = "https://ny.mainnet.block-engine.jito.wtf:443";
const SERVER_URL2: &str = "https://ny.testnet.block-engine.jito.wtf:443";
#[tokio::test]
#[serial]
async fn custom_endpoint_default_timeout() {
match JitoClient::new(SERVER_URL2, None).await {
Ok(client) => println!("Get Endpoint: {}", client.get_endpoint()),
Err(e) => panic!("Error in creating client: {e}"),
}
}
#[tokio::test]
#[serial]
async fn dynamic_region_custom_timeout() {
match JitoClient::new_dynamic_region(Some(5)).await {
Ok(client) => println!("Get Endpoint: {}", client.get_endpoint()),
Err(e) => panic!("Error in creating client: {e}"),
}
}
#[tokio::test]
#[serial]
async fn send_endpoint() {
let start = std::time::Instant::now();
let mut client = JitoClient::new(SERVER_URL1, None)
.await
.expect("Failed to create client");
let signer_keypair = Keypair::new();
let bh = Hash::new_unique();
let tip_account = Pubkey::from_str("96gYZGLnJYVFmbjzopPSU6QiEV5fGqZNyN9nmNhvrZU5").unwrap();
let ix = Instruction {
program_id: Pubkey::from_str("Memo1UhkJRfHyvLMcVucJwxXeuD728EqVDDwQDxFMNo").unwrap(),
accounts: vec![AccountMeta::new(signer_keypair.pubkey(), true)],
data: b"test".to_vec(),
};
let txns = vec![
ix,
transfer(&signer_keypair.pubkey(), &tip_account, 100_000),
];
let message = VersionedMessage::Legacy(Message::new_with_blockhash(
&txns,
Some(&signer_keypair.pubkey()),
&bh,
));
let transaction = VersionedTransaction::try_new(message, &[signer_keypair]).unwrap();
match client.send(&[transaction]).await {
Ok(out) => println!("bundle id: {out}"),
Err(e) => panic!("Send error: {e}"),
}
println!("Elapsed: {} ms", start.elapsed().as_millis());
}
#[tokio::test]
#[serial]
async fn send_with_retries() {
let start = std::time::Instant::now();
let mut client = JitoClient::new(SERVER_URL2, None)
.await
.expect("Failed to create client");
let signer_keypair = Keypair::new();
let bh = Hash::new_unique();
let tip_account = Pubkey::from_str("96gYZGLnJYVFmbjzopPSU6QiEV5fGqZNyN9nmNhvrZU5").unwrap();
let ix = Instruction {
program_id: Pubkey::from_str("Memo1UhkJRfHyvLMcVucJwxXeuD728EqVDDwQDxFMNo").unwrap(),
accounts: vec![AccountMeta::new(signer_keypair.pubkey(), true)],
data: b"test".to_vec(),
};
let txns = vec![
ix,
transfer(&signer_keypair.pubkey(), &tip_account, 100_000),
];
let message = VersionedMessage::Legacy(Message::new_with_blockhash(
&txns,
Some(&signer_keypair.pubkey()),
&bh,
));
let transaction = VersionedTransaction::try_new(message, &[signer_keypair]).unwrap();
match client
.send_with_retry(&[transaction], RetryLogic::new(3))
.await
{
Ok(out) => println!("bundle id: {out}"),
Err(e) => println!("Send error: {e}"),
}
println!("Elapsed: {} ms", start.elapsed().as_millis());
}
}