1use {
5 async_lock::Mutex,
6 async_trait::async_trait,
7 futures::future::TryFutureExt,
8 log::*,
9 quinn::{
10 crypto::rustls::QuicClientConfig, ClientConfig, ClosedStream, ConnectError, Connection,
11 ConnectionError, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig,
12 WriteError,
13 },
14 solana_connection_cache::{
15 client_connection::ClientStats, connection_cache_stats::ConnectionCacheStats,
16 nonblocking::client_connection::ClientConnection,
17 },
18 solana_keypair::Keypair,
19 solana_measure::measure::Measure,
20 solana_net_utils::{SocketConfig, VALIDATOR_PORT_RANGE},
21 solana_quic_definitions::{
22 QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT, QUIC_SEND_FAIRNESS,
23 },
24 solana_rpc_client_api::client_error::ErrorKind as ClientErrorKind,
25 solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID,
26 solana_tls_utils::{
27 new_dummy_x509_certificate, socket_addr_to_quic_server_name, tls_client_config_builder,
28 QuicClientCertificate,
29 },
30 solana_transaction_error::TransportResult,
31 std::{
32 net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
33 sync::{atomic::Ordering, Arc},
34 thread,
35 },
36 thiserror::Error,
37 tokio::{sync::OnceCell, time::timeout},
38};
39
40pub struct QuicLazyInitializedEndpoint {
42 endpoint: OnceCell<Arc<Endpoint>>,
43 client_certificate: Arc<QuicClientCertificate>,
44 client_endpoint: Option<Endpoint>,
45}
46
47#[derive(Error, Debug)]
48pub enum QuicError {
49 #[error(transparent)]
50 WriteError(#[from] WriteError),
51 #[error(transparent)]
52 ConnectionError(#[from] ConnectionError),
53 #[error(transparent)]
54 ConnectError(#[from] ConnectError),
55 #[error(transparent)]
56 ClosedStream(#[from] ClosedStream),
57}
58
59impl From<QuicError> for ClientErrorKind {
60 fn from(quic_error: QuicError) -> Self {
61 Self::Custom(format!("{quic_error:?}"))
62 }
63}
64
65impl QuicLazyInitializedEndpoint {
66 pub fn new(
67 client_certificate: Arc<QuicClientCertificate>,
68 client_endpoint: Option<Endpoint>,
69 ) -> Self {
70 Self {
71 endpoint: OnceCell::<Arc<Endpoint>>::new(),
72 client_certificate,
73 client_endpoint,
74 }
75 }
76
77 fn create_endpoint(&self) -> Endpoint {
78 let mut endpoint = if let Some(endpoint) = &self.client_endpoint {
79 endpoint.clone()
80 } else {
81 let config = SocketConfig::default();
82 let client_socket = solana_net_utils::bind_in_range_with_config(
83 IpAddr::V4(Ipv4Addr::UNSPECIFIED),
84 VALIDATOR_PORT_RANGE,
85 config,
86 )
87 .expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range")
88 .1;
89 info!("Local endpoint is : {client_socket:?}");
90
91 QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket)
92 };
93
94 let mut crypto = tls_client_config_builder()
95 .with_client_auth_cert(
96 vec![self.client_certificate.certificate.clone()],
97 self.client_certificate.key.clone_key(),
98 )
99 .expect("Failed to set QUIC client certificates");
100 crypto.enable_early_data = true;
101 crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
102
103 let mut config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto).unwrap()));
104 let mut transport_config = TransportConfig::default();
105
106 let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
107 transport_config.max_idle_timeout(Some(timeout));
108 transport_config.keep_alive_interval(Some(QUIC_KEEP_ALIVE));
109 transport_config.send_fairness(QUIC_SEND_FAIRNESS);
110 config.transport_config(Arc::new(transport_config));
111
112 endpoint.set_default_client_config(config);
113
114 endpoint
115 }
116
117 async fn get_endpoint(&self) -> Arc<Endpoint> {
118 self.endpoint
119 .get_or_init(|| async { Arc::new(self.create_endpoint()) })
120 .await
121 .clone()
122 }
123}
124
125impl Default for QuicLazyInitializedEndpoint {
126 fn default() -> Self {
127 let (cert, priv_key) = new_dummy_x509_certificate(&Keypair::new());
128 Self::new(
129 Arc::new(QuicClientCertificate {
130 certificate: cert,
131 key: priv_key,
132 }),
133 None,
134 )
135 }
136}
137
138#[derive(Clone)]
141struct QuicNewConnection {
142 endpoint: Arc<Endpoint>,
143 connection: Arc<Connection>,
144}
145
146impl QuicNewConnection {
147 async fn make_connection(
149 endpoint: Arc<QuicLazyInitializedEndpoint>,
150 addr: SocketAddr,
151 stats: &ClientStats,
152 ) -> Result<Self, QuicError> {
153 let mut make_connection_measure = Measure::start("make_connection_measure");
154 let endpoint = endpoint.get_endpoint().await;
155 let server_name = socket_addr_to_quic_server_name(addr);
156 let connecting = endpoint.connect(addr, &server_name)?;
157 stats.total_connections.fetch_add(1, Ordering::Relaxed);
158 if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
159 {
160 if connecting_result.is_err() {
161 stats.connection_errors.fetch_add(1, Ordering::Relaxed);
162 }
163 make_connection_measure.stop();
164 stats
165 .make_connection_ms
166 .fetch_add(make_connection_measure.as_ms(), Ordering::Relaxed);
167
168 let connection = connecting_result?;
169
170 Ok(Self {
171 endpoint,
172 connection: Arc::new(connection),
173 })
174 } else {
175 Err(ConnectionError::TimedOut.into())
176 }
177 }
178
179 fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint {
180 quinn::Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime))
181 .expect("QuicNewConnection::create_endpoint quinn::Endpoint::new")
182 }
183
184 async fn make_connection_0rtt(
187 &mut self,
188 addr: SocketAddr,
189 stats: &ClientStats,
190 ) -> Result<Arc<Connection>, QuicError> {
191 let server_name = socket_addr_to_quic_server_name(addr);
192 let connecting = self.endpoint.connect(addr, &server_name)?;
193 stats.total_connections.fetch_add(1, Ordering::Relaxed);
194 let connection = match connecting.into_0rtt() {
195 Ok((connection, zero_rtt)) => {
196 if let Ok(zero_rtt) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, zero_rtt).await {
197 if zero_rtt {
198 stats.zero_rtt_accepts.fetch_add(1, Ordering::Relaxed);
199 } else {
200 stats.zero_rtt_rejects.fetch_add(1, Ordering::Relaxed);
201 }
202 connection
203 } else {
204 return Err(ConnectionError::TimedOut.into());
205 }
206 }
207 Err(connecting) => {
208 stats.connection_errors.fetch_add(1, Ordering::Relaxed);
209
210 if let Ok(connecting_result) =
211 timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
212 {
213 connecting_result?
214 } else {
215 return Err(ConnectionError::TimedOut.into());
216 }
217 }
218 };
219 self.connection = Arc::new(connection);
220 Ok(self.connection.clone())
221 }
222}
223
224pub struct QuicClient {
225 endpoint: Arc<QuicLazyInitializedEndpoint>,
226 connection: Arc<Mutex<Option<QuicNewConnection>>>,
227 addr: SocketAddr,
228 stats: Arc<ClientStats>,
229}
230
231impl QuicClient {
232 pub fn new(endpoint: Arc<QuicLazyInitializedEndpoint>, addr: SocketAddr) -> Self {
233 Self {
234 endpoint,
235 connection: Arc::new(Mutex::new(None)),
236 addr,
237 stats: Arc::new(ClientStats::default()),
238 }
239 }
240
241 async fn _send_buffer_using_conn(
242 data: &[u8],
243 connection: &Connection,
244 ) -> Result<(), QuicError> {
245 let mut send_stream = connection.open_uni().await?;
246 send_stream.write_all(data).await?;
247 Ok(())
248 }
249
250 async fn _send_buffer(
253 &self,
254 data: &[u8],
255 stats: &ClientStats,
256 connection_stats: Arc<ConnectionCacheStats>,
257 ) -> Result<Arc<Connection>, QuicError> {
258 let mut measure_send_packet = Measure::start("send_packet_us");
259 let mut measure_prepare_connection = Measure::start("prepare_connection");
260 let mut connection_try_count = 0;
261 let mut last_connection_id = 0;
262 let mut last_error = None;
263 while connection_try_count < 2 {
264 let connection = {
265 let mut conn_guard = self.connection.lock().await;
266
267 let maybe_conn = conn_guard.as_mut();
268 match maybe_conn {
269 Some(conn) => {
270 if conn.connection.stable_id() == last_connection_id {
271 let conn = conn.make_connection_0rtt(self.addr, stats).await;
273 match conn {
274 Ok(conn) => {
275 info!(
276 "Made 0rtt connection to {} with id {} try_count {}, last_connection_id: {}, last_error: {:?}",
277 self.addr,
278 conn.stable_id(),
279 connection_try_count,
280 last_connection_id,
281 last_error,
282 );
283 connection_try_count += 1;
284 conn
285 }
286 Err(err) => {
287 info!(
288 "Cannot make 0rtt connection to {}, error {:}",
289 self.addr, err
290 );
291 return Err(err);
292 }
293 }
294 } else {
295 stats.connection_reuse.fetch_add(1, Ordering::Relaxed);
296 conn.connection.clone()
297 }
298 }
299 None => {
300 let conn = QuicNewConnection::make_connection(
301 self.endpoint.clone(),
302 self.addr,
303 stats,
304 )
305 .await;
306 match conn {
307 Ok(conn) => {
308 *conn_guard = Some(conn.clone());
309 info!(
310 "Made connection to {} id {} try_count {}, from connection cache warming?: {}",
311 self.addr,
312 conn.connection.stable_id(),
313 connection_try_count,
314 data.is_empty(),
315 );
316 connection_try_count += 1;
317 conn.connection.clone()
318 }
319 Err(err) => {
320 info!("Cannot make connection to {}, error {:}, from connection cache warming?: {}",
321 self.addr, err, data.is_empty());
322 return Err(err);
323 }
324 }
325 }
326 }
327 };
328
329 let new_stats = connection.stats();
330
331 connection_stats
332 .total_client_stats
333 .congestion_events
334 .update_stat(
335 &self.stats.congestion_events,
336 new_stats.path.congestion_events,
337 );
338
339 connection_stats
340 .total_client_stats
341 .streams_blocked_uni
342 .update_stat(
343 &self.stats.streams_blocked_uni,
344 new_stats.frame_tx.streams_blocked_uni,
345 );
346
347 connection_stats
348 .total_client_stats
349 .data_blocked
350 .update_stat(&self.stats.data_blocked, new_stats.frame_tx.data_blocked);
351
352 connection_stats
353 .total_client_stats
354 .acks
355 .update_stat(&self.stats.acks, new_stats.frame_tx.acks);
356
357 if data.is_empty() {
358 return Ok(connection);
360 }
361
362 last_connection_id = connection.stable_id();
363 measure_prepare_connection.stop();
364
365 match Self::_send_buffer_using_conn(data, &connection).await {
366 Ok(()) => {
367 measure_send_packet.stop();
368 stats.successful_packets.fetch_add(1, Ordering::Relaxed);
369 stats
370 .send_packets_us
371 .fetch_add(measure_send_packet.as_us(), Ordering::Relaxed);
372 stats
373 .prepare_connection_us
374 .fetch_add(measure_prepare_connection.as_us(), Ordering::Relaxed);
375 trace!(
376 "Succcessfully sent to {} with id {}, thread: {:?}, data len: {}, send_packet_us: {} prepare_connection_us: {}",
377 self.addr,
378 connection.stable_id(),
379 thread::current().id(),
380 data.len(),
381 measure_send_packet.as_us(),
382 measure_prepare_connection.as_us(),
383 );
384
385 return Ok(connection);
386 }
387 Err(err) => match err {
388 QuicError::ConnectionError(_) => {
389 last_error = Some(err);
390 }
391 _ => {
392 info!(
393 "Error sending to {} with id {}, error {:?} thread: {:?}",
394 self.addr,
395 connection.stable_id(),
396 err,
397 thread::current().id(),
398 );
399 return Err(err);
400 }
401 },
402 }
403 }
404
405 info!(
407 "Ran into an error sending data {:?}, exhausted retries to {}",
408 last_error, self.addr
409 );
410 Err(last_error.expect("QuicClient::_send_buffer last_error.expect"))
413 }
414
415 pub async fn send_buffer<T>(
416 &self,
417 data: T,
418 stats: &ClientStats,
419 connection_stats: Arc<ConnectionCacheStats>,
420 ) -> Result<(), ClientErrorKind>
421 where
422 T: AsRef<[u8]>,
423 {
424 self._send_buffer(data.as_ref(), stats, connection_stats)
425 .await
426 .map_err(Into::<ClientErrorKind>::into)?;
427 Ok(())
428 }
429
430 pub async fn send_batch<T>(
431 &self,
432 buffers: &[T],
433 stats: &ClientStats,
434 connection_stats: Arc<ConnectionCacheStats>,
435 ) -> Result<(), ClientErrorKind>
436 where
437 T: AsRef<[u8]>,
438 {
439 if buffers.is_empty() {
451 return Ok(());
452 }
453 let connection = self
454 ._send_buffer(buffers[0].as_ref(), stats, connection_stats)
455 .await
456 .map_err(Into::<ClientErrorKind>::into)?;
457
458 for data in buffers[1..buffers.len()].iter() {
459 Self::_send_buffer_using_conn(data.as_ref(), &connection).await?;
460 }
461 Ok(())
462 }
463
464 pub fn server_addr(&self) -> &SocketAddr {
465 &self.addr
466 }
467
468 pub fn stats(&self) -> Arc<ClientStats> {
469 self.stats.clone()
470 }
471}
472
473pub struct QuicClientConnection {
474 pub client: Arc<QuicClient>,
475 pub connection_stats: Arc<ConnectionCacheStats>,
476}
477
478impl QuicClientConnection {
479 pub fn base_stats(&self) -> Arc<ClientStats> {
480 self.client.stats()
481 }
482
483 pub fn connection_stats(&self) -> Arc<ConnectionCacheStats> {
484 self.connection_stats.clone()
485 }
486
487 pub fn new(
488 endpoint: Arc<QuicLazyInitializedEndpoint>,
489 addr: SocketAddr,
490 connection_stats: Arc<ConnectionCacheStats>,
491 ) -> Self {
492 let client = Arc::new(QuicClient::new(endpoint, addr));
493 Self::new_with_client(client, connection_stats)
494 }
495
496 pub fn new_with_client(
497 client: Arc<QuicClient>,
498 connection_stats: Arc<ConnectionCacheStats>,
499 ) -> Self {
500 Self {
501 client,
502 connection_stats,
503 }
504 }
505}
506
507#[async_trait]
508impl ClientConnection for QuicClientConnection {
509 fn server_addr(&self) -> &SocketAddr {
510 self.client.server_addr()
511 }
512
513 async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
514 let stats = ClientStats::default();
515 let len = buffers.len();
516 let res = self
517 .client
518 .send_batch(buffers, &stats, self.connection_stats.clone())
519 .await;
520 self.connection_stats
521 .add_client_stats(&stats, len, res.is_ok());
522 res?;
523 Ok(())
524 }
525
526 async fn send_data(&self, data: &[u8]) -> TransportResult<()> {
527 let stats = Arc::new(ClientStats::default());
528 let num_packets = if data.is_empty() { 0 } else { 1 };
530 self.client
531 .send_buffer(data, &stats, self.connection_stats.clone())
532 .map_ok(|v| {
533 self.connection_stats
534 .add_client_stats(&stats, num_packets, true);
535 v
536 })
537 .map_err(|e| {
538 warn!(
539 "Failed to send data async to {}, error: {:?} ",
540 self.server_addr(),
541 e
542 );
543 datapoint_warn!("send-wire-async", ("failure", 1, i64),);
544 self.connection_stats
545 .add_client_stats(&stats, num_packets, false);
546 e.into()
547 })
548 .await
549 }
550}