solana_quic_client/nonblocking/
quic_client.rs

1//! Simple nonblocking client that connects to a given UDP port with the QUIC protocol
2//! and provides an interface for sending data which is restricted by the
3//! server's flow control.
4use {
5    async_lock::Mutex,
6    async_trait::async_trait,
7    futures::future::TryFutureExt,
8    log::*,
9    quinn::{
10        crypto::rustls::QuicClientConfig, ClientConfig, ClosedStream, ConnectError, Connection,
11        ConnectionError, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig,
12        WriteError,
13    },
14    solana_connection_cache::{
15        client_connection::ClientStats, connection_cache_stats::ConnectionCacheStats,
16        nonblocking::client_connection::ClientConnection,
17    },
18    solana_keypair::Keypair,
19    solana_measure::measure::Measure,
20    solana_net_utils::{SocketConfig, VALIDATOR_PORT_RANGE},
21    solana_quic_definitions::{
22        QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT, QUIC_SEND_FAIRNESS,
23    },
24    solana_rpc_client_api::client_error::ErrorKind as ClientErrorKind,
25    solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID,
26    solana_tls_utils::{
27        new_dummy_x509_certificate, socket_addr_to_quic_server_name, tls_client_config_builder,
28        QuicClientCertificate,
29    },
30    solana_transaction_error::TransportResult,
31    std::{
32        net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
33        sync::{atomic::Ordering, Arc},
34        thread,
35    },
36    thiserror::Error,
37    tokio::{sync::OnceCell, time::timeout},
38};
39
40/// A lazy-initialized Quic Endpoint
41pub struct QuicLazyInitializedEndpoint {
42    endpoint: OnceCell<Arc<Endpoint>>,
43    client_certificate: Arc<QuicClientCertificate>,
44    client_endpoint: Option<Endpoint>,
45}
46
47#[derive(Error, Debug)]
48pub enum QuicError {
49    #[error(transparent)]
50    WriteError(#[from] WriteError),
51    #[error(transparent)]
52    ConnectionError(#[from] ConnectionError),
53    #[error(transparent)]
54    ConnectError(#[from] ConnectError),
55    #[error(transparent)]
56    ClosedStream(#[from] ClosedStream),
57}
58
59impl From<QuicError> for ClientErrorKind {
60    fn from(quic_error: QuicError) -> Self {
61        Self::Custom(format!("{quic_error:?}"))
62    }
63}
64
65impl QuicLazyInitializedEndpoint {
66    pub fn new(
67        client_certificate: Arc<QuicClientCertificate>,
68        client_endpoint: Option<Endpoint>,
69    ) -> Self {
70        Self {
71            endpoint: OnceCell::<Arc<Endpoint>>::new(),
72            client_certificate,
73            client_endpoint,
74        }
75    }
76
77    fn create_endpoint(&self) -> Endpoint {
78        let mut endpoint = if let Some(endpoint) = &self.client_endpoint {
79            endpoint.clone()
80        } else {
81            let config = SocketConfig::default();
82            let client_socket = solana_net_utils::bind_in_range_with_config(
83                IpAddr::V4(Ipv4Addr::UNSPECIFIED),
84                VALIDATOR_PORT_RANGE,
85                config,
86            )
87            .expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range")
88            .1;
89            info!("Local endpoint is : {client_socket:?}");
90
91            QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket)
92        };
93
94        let mut crypto = tls_client_config_builder()
95            .with_client_auth_cert(
96                vec![self.client_certificate.certificate.clone()],
97                self.client_certificate.key.clone_key(),
98            )
99            .expect("Failed to set QUIC client certificates");
100        crypto.enable_early_data = true;
101        crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
102
103        let mut config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto).unwrap()));
104        let mut transport_config = TransportConfig::default();
105
106        let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
107        transport_config.max_idle_timeout(Some(timeout));
108        transport_config.keep_alive_interval(Some(QUIC_KEEP_ALIVE));
109        transport_config.send_fairness(QUIC_SEND_FAIRNESS);
110        config.transport_config(Arc::new(transport_config));
111
112        endpoint.set_default_client_config(config);
113
114        endpoint
115    }
116
117    async fn get_endpoint(&self) -> Arc<Endpoint> {
118        self.endpoint
119            .get_or_init(|| async { Arc::new(self.create_endpoint()) })
120            .await
121            .clone()
122    }
123}
124
125impl Default for QuicLazyInitializedEndpoint {
126    fn default() -> Self {
127        let (cert, priv_key) = new_dummy_x509_certificate(&Keypair::new());
128        Self::new(
129            Arc::new(QuicClientCertificate {
130                certificate: cert,
131                key: priv_key,
132            }),
133            None,
134        )
135    }
136}
137
138/// A wrapper over NewConnection with additional capability to create the endpoint as part
139/// of creating a new connection.
140#[derive(Clone)]
141struct QuicNewConnection {
142    endpoint: Arc<Endpoint>,
143    connection: Arc<Connection>,
144}
145
146impl QuicNewConnection {
147    /// Create a QuicNewConnection given the remote address 'addr'.
148    async fn make_connection(
149        endpoint: Arc<QuicLazyInitializedEndpoint>,
150        addr: SocketAddr,
151        stats: &ClientStats,
152    ) -> Result<Self, QuicError> {
153        let mut make_connection_measure = Measure::start("make_connection_measure");
154        let endpoint = endpoint.get_endpoint().await;
155        let server_name = socket_addr_to_quic_server_name(addr);
156        let connecting = endpoint.connect(addr, &server_name)?;
157        stats.total_connections.fetch_add(1, Ordering::Relaxed);
158        if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
159        {
160            if connecting_result.is_err() {
161                stats.connection_errors.fetch_add(1, Ordering::Relaxed);
162            }
163            make_connection_measure.stop();
164            stats
165                .make_connection_ms
166                .fetch_add(make_connection_measure.as_ms(), Ordering::Relaxed);
167
168            let connection = connecting_result?;
169
170            Ok(Self {
171                endpoint,
172                connection: Arc::new(connection),
173            })
174        } else {
175            Err(ConnectionError::TimedOut.into())
176        }
177    }
178
179    fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint {
180        quinn::Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime))
181            .expect("QuicNewConnection::create_endpoint quinn::Endpoint::new")
182    }
183
184    // Attempts to make a faster connection by taking advantage of pre-existing key material.
185    // Only works if connection to this endpoint was previously established.
186    async fn make_connection_0rtt(
187        &mut self,
188        addr: SocketAddr,
189        stats: &ClientStats,
190    ) -> Result<Arc<Connection>, QuicError> {
191        let server_name = socket_addr_to_quic_server_name(addr);
192        let connecting = self.endpoint.connect(addr, &server_name)?;
193        stats.total_connections.fetch_add(1, Ordering::Relaxed);
194        let connection = match connecting.into_0rtt() {
195            Ok((connection, zero_rtt)) => {
196                if let Ok(zero_rtt) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, zero_rtt).await {
197                    if zero_rtt {
198                        stats.zero_rtt_accepts.fetch_add(1, Ordering::Relaxed);
199                    } else {
200                        stats.zero_rtt_rejects.fetch_add(1, Ordering::Relaxed);
201                    }
202                    connection
203                } else {
204                    return Err(ConnectionError::TimedOut.into());
205                }
206            }
207            Err(connecting) => {
208                stats.connection_errors.fetch_add(1, Ordering::Relaxed);
209
210                if let Ok(connecting_result) =
211                    timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
212                {
213                    connecting_result?
214                } else {
215                    return Err(ConnectionError::TimedOut.into());
216                }
217            }
218        };
219        self.connection = Arc::new(connection);
220        Ok(self.connection.clone())
221    }
222}
223
224pub struct QuicClient {
225    endpoint: Arc<QuicLazyInitializedEndpoint>,
226    connection: Arc<Mutex<Option<QuicNewConnection>>>,
227    addr: SocketAddr,
228    stats: Arc<ClientStats>,
229}
230
231impl QuicClient {
232    pub fn new(endpoint: Arc<QuicLazyInitializedEndpoint>, addr: SocketAddr) -> Self {
233        Self {
234            endpoint,
235            connection: Arc::new(Mutex::new(None)),
236            addr,
237            stats: Arc::new(ClientStats::default()),
238        }
239    }
240
241    async fn _send_buffer_using_conn(
242        data: &[u8],
243        connection: &Connection,
244    ) -> Result<(), QuicError> {
245        let mut send_stream = connection.open_uni().await?;
246        send_stream.write_all(data).await?;
247        Ok(())
248    }
249
250    // Attempts to send data, connecting/reconnecting as necessary
251    // On success, returns the connection used to successfully send the data
252    async fn _send_buffer(
253        &self,
254        data: &[u8],
255        stats: &ClientStats,
256        connection_stats: Arc<ConnectionCacheStats>,
257    ) -> Result<Arc<Connection>, QuicError> {
258        let mut measure_send_packet = Measure::start("send_packet_us");
259        let mut measure_prepare_connection = Measure::start("prepare_connection");
260        let mut connection_try_count = 0;
261        let mut last_connection_id = 0;
262        let mut last_error = None;
263        while connection_try_count < 2 {
264            let connection = {
265                let mut conn_guard = self.connection.lock().await;
266
267                let maybe_conn = conn_guard.as_mut();
268                match maybe_conn {
269                    Some(conn) => {
270                        if conn.connection.stable_id() == last_connection_id {
271                            // this is the problematic connection we had used before, create a new one
272                            let conn = conn.make_connection_0rtt(self.addr, stats).await;
273                            match conn {
274                                Ok(conn) => {
275                                    info!(
276                                        "Made 0rtt connection to {} with id {} try_count {}, last_connection_id: {}, last_error: {:?}",
277                                        self.addr,
278                                        conn.stable_id(),
279                                        connection_try_count,
280                                        last_connection_id,
281                                        last_error,
282                                    );
283                                    connection_try_count += 1;
284                                    conn
285                                }
286                                Err(err) => {
287                                    info!(
288                                        "Cannot make 0rtt connection to {}, error {:}",
289                                        self.addr, err
290                                    );
291                                    return Err(err);
292                                }
293                            }
294                        } else {
295                            stats.connection_reuse.fetch_add(1, Ordering::Relaxed);
296                            conn.connection.clone()
297                        }
298                    }
299                    None => {
300                        let conn = QuicNewConnection::make_connection(
301                            self.endpoint.clone(),
302                            self.addr,
303                            stats,
304                        )
305                        .await;
306                        match conn {
307                            Ok(conn) => {
308                                *conn_guard = Some(conn.clone());
309                                info!(
310                                    "Made connection to {} id {} try_count {}, from connection cache warming?: {}",
311                                    self.addr,
312                                    conn.connection.stable_id(),
313                                    connection_try_count,
314                                    data.is_empty(),
315                                );
316                                connection_try_count += 1;
317                                conn.connection.clone()
318                            }
319                            Err(err) => {
320                                info!("Cannot make connection to {}, error {:}, from connection cache warming?: {}",
321                                    self.addr, err, data.is_empty());
322                                return Err(err);
323                            }
324                        }
325                    }
326                }
327            };
328
329            let new_stats = connection.stats();
330
331            connection_stats
332                .total_client_stats
333                .congestion_events
334                .update_stat(
335                    &self.stats.congestion_events,
336                    new_stats.path.congestion_events,
337                );
338
339            connection_stats
340                .total_client_stats
341                .streams_blocked_uni
342                .update_stat(
343                    &self.stats.streams_blocked_uni,
344                    new_stats.frame_tx.streams_blocked_uni,
345                );
346
347            connection_stats
348                .total_client_stats
349                .data_blocked
350                .update_stat(&self.stats.data_blocked, new_stats.frame_tx.data_blocked);
351
352            connection_stats
353                .total_client_stats
354                .acks
355                .update_stat(&self.stats.acks, new_stats.frame_tx.acks);
356
357            if data.is_empty() {
358                // no need to send packet as it is only for warming connections
359                return Ok(connection);
360            }
361
362            last_connection_id = connection.stable_id();
363            measure_prepare_connection.stop();
364
365            match Self::_send_buffer_using_conn(data, &connection).await {
366                Ok(()) => {
367                    measure_send_packet.stop();
368                    stats.successful_packets.fetch_add(1, Ordering::Relaxed);
369                    stats
370                        .send_packets_us
371                        .fetch_add(measure_send_packet.as_us(), Ordering::Relaxed);
372                    stats
373                        .prepare_connection_us
374                        .fetch_add(measure_prepare_connection.as_us(), Ordering::Relaxed);
375                    trace!(
376                        "Succcessfully sent to {} with id {}, thread: {:?}, data len: {}, send_packet_us: {} prepare_connection_us: {}",
377                        self.addr,
378                        connection.stable_id(),
379                        thread::current().id(),
380                        data.len(),
381                        measure_send_packet.as_us(),
382                        measure_prepare_connection.as_us(),
383                    );
384
385                    return Ok(connection);
386                }
387                Err(err) => match err {
388                    QuicError::ConnectionError(_) => {
389                        last_error = Some(err);
390                    }
391                    _ => {
392                        info!(
393                            "Error sending to {} with id {}, error {:?} thread: {:?}",
394                            self.addr,
395                            connection.stable_id(),
396                            err,
397                            thread::current().id(),
398                        );
399                        return Err(err);
400                    }
401                },
402            }
403        }
404
405        // if we come here, that means we have exhausted maximum retries, return the error
406        info!(
407            "Ran into an error sending data {:?}, exhausted retries to {}",
408            last_error, self.addr
409        );
410        // If we get here but last_error is None, then we have a logic error
411        // in this function, so panic here with an expect to help debugging
412        Err(last_error.expect("QuicClient::_send_buffer last_error.expect"))
413    }
414
415    pub async fn send_buffer<T>(
416        &self,
417        data: T,
418        stats: &ClientStats,
419        connection_stats: Arc<ConnectionCacheStats>,
420    ) -> Result<(), ClientErrorKind>
421    where
422        T: AsRef<[u8]>,
423    {
424        self._send_buffer(data.as_ref(), stats, connection_stats)
425            .await
426            .map_err(Into::<ClientErrorKind>::into)?;
427        Ok(())
428    }
429
430    pub async fn send_batch<T>(
431        &self,
432        buffers: &[T],
433        stats: &ClientStats,
434        connection_stats: Arc<ConnectionCacheStats>,
435    ) -> Result<(), ClientErrorKind>
436    where
437        T: AsRef<[u8]>,
438    {
439        // Start off by "testing" the connection by sending the first buffer
440        // This will also connect to the server if not already connected
441        // and reconnect and retry if the first send attempt failed
442        // (for example due to a timed out connection), returning an error
443        // or the connection that was used to successfully send the buffer.
444        // We will use the returned connection to send the rest of the buffers in the batch
445        // to avoid touching the mutex in self, and not bother reconnecting if we fail along the way
446        // since testing even in the ideal GCE environment has found no cases
447        // where reconnecting and retrying in the middle of a batch send
448        // (i.e. we encounter a connection error in the middle of a batch send, which presumably cannot
449        // be due to a timed out connection) has succeeded
450        if buffers.is_empty() {
451            return Ok(());
452        }
453        let connection = self
454            ._send_buffer(buffers[0].as_ref(), stats, connection_stats)
455            .await
456            .map_err(Into::<ClientErrorKind>::into)?;
457
458        for data in buffers[1..buffers.len()].iter() {
459            Self::_send_buffer_using_conn(data.as_ref(), &connection).await?;
460        }
461        Ok(())
462    }
463
464    pub fn server_addr(&self) -> &SocketAddr {
465        &self.addr
466    }
467
468    pub fn stats(&self) -> Arc<ClientStats> {
469        self.stats.clone()
470    }
471}
472
473pub struct QuicClientConnection {
474    pub client: Arc<QuicClient>,
475    pub connection_stats: Arc<ConnectionCacheStats>,
476}
477
478impl QuicClientConnection {
479    pub fn base_stats(&self) -> Arc<ClientStats> {
480        self.client.stats()
481    }
482
483    pub fn connection_stats(&self) -> Arc<ConnectionCacheStats> {
484        self.connection_stats.clone()
485    }
486
487    pub fn new(
488        endpoint: Arc<QuicLazyInitializedEndpoint>,
489        addr: SocketAddr,
490        connection_stats: Arc<ConnectionCacheStats>,
491    ) -> Self {
492        let client = Arc::new(QuicClient::new(endpoint, addr));
493        Self::new_with_client(client, connection_stats)
494    }
495
496    pub fn new_with_client(
497        client: Arc<QuicClient>,
498        connection_stats: Arc<ConnectionCacheStats>,
499    ) -> Self {
500        Self {
501            client,
502            connection_stats,
503        }
504    }
505}
506
507#[async_trait]
508impl ClientConnection for QuicClientConnection {
509    fn server_addr(&self) -> &SocketAddr {
510        self.client.server_addr()
511    }
512
513    async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
514        let stats = ClientStats::default();
515        let len = buffers.len();
516        let res = self
517            .client
518            .send_batch(buffers, &stats, self.connection_stats.clone())
519            .await;
520        self.connection_stats
521            .add_client_stats(&stats, len, res.is_ok());
522        res?;
523        Ok(())
524    }
525
526    async fn send_data(&self, data: &[u8]) -> TransportResult<()> {
527        let stats = Arc::new(ClientStats::default());
528        // When data is empty which is from cache warmer, we are not sending packets actually, do not count it in
529        let num_packets = if data.is_empty() { 0 } else { 1 };
530        self.client
531            .send_buffer(data, &stats, self.connection_stats.clone())
532            .map_ok(|v| {
533                self.connection_stats
534                    .add_client_stats(&stats, num_packets, true);
535                v
536            })
537            .map_err(|e| {
538                warn!(
539                    "Failed to send data async to {}, error: {:?} ",
540                    self.server_addr(),
541                    e
542                );
543                datapoint_warn!("send-wire-async", ("failure", 1, i64),);
544                self.connection_stats
545                    .add_client_stats(&stats, num_packets, false);
546                e.into()
547            })
548            .await
549    }
550}