solana_quic_client/
lib.rs

1#![cfg_attr(
2    not(feature = "agave-unstable-api"),
3    deprecated(
4        since = "3.1.0",
5        note = "This crate has been marked for formal inclusion in the Agave Unstable API. From \
6                v4.0.0 onward, the `agave-unstable-api` crate feature must be specified to \
7                acknowledge use of an interface that may break without warning."
8    )
9)]
10#![allow(clippy::arithmetic_side_effects)]
11
12pub mod nonblocking;
13pub mod quic_client;
14
15#[macro_use]
16extern crate solana_metrics;
17
18use {
19    crate::{
20        nonblocking::quic_client::{
21            QuicClient, QuicClientConnection as NonblockingQuicClientConnection,
22            QuicLazyInitializedEndpoint,
23        },
24        quic_client::{
25            close_quic_connection, QuicClientConnection as BlockingQuicClientConnection,
26        },
27    },
28    log::debug,
29    quic_client::get_runtime,
30    quinn::{Endpoint, EndpointConfig, TokioRuntime},
31    solana_connection_cache::{
32        connection_cache::{
33            BaseClientConnection, ClientError, ConnectionCache, ConnectionManager, ConnectionPool,
34            ConnectionPoolError, NewConnectionConfig, Protocol,
35        },
36        connection_cache_stats::ConnectionCacheStats,
37    },
38    solana_keypair::Keypair,
39    solana_pubkey::Pubkey,
40    solana_signer::Signer,
41    solana_streamer::streamer::StakedNodes,
42    solana_tls_utils::{new_dummy_x509_certificate, QuicClientCertificate},
43    std::{
44        net::{IpAddr, SocketAddr, UdpSocket},
45        sync::{Arc, RwLock},
46    },
47};
48
49pub struct QuicPool {
50    connections: Vec<Arc<Quic>>,
51    endpoint: Arc<QuicLazyInitializedEndpoint>,
52}
53impl ConnectionPool for QuicPool {
54    type BaseClientConnection = Quic;
55    type NewConnectionConfig = QuicConfig;
56
57    fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) -> usize {
58        let connection = self.create_pool_entry(config, addr);
59        let idx = self.connections.len();
60        self.connections.push(connection);
61        idx
62    }
63
64    fn num_connections(&self) -> usize {
65        self.connections.len()
66    }
67
68    fn get(&self, index: usize) -> Result<Arc<Self::BaseClientConnection>, ConnectionPoolError> {
69        self.connections
70            .get(index)
71            .cloned()
72            .ok_or(ConnectionPoolError::IndexOutOfRange)
73    }
74
75    fn create_pool_entry(
76        &self,
77        _config: &Self::NewConnectionConfig,
78        addr: &SocketAddr,
79    ) -> Arc<Self::BaseClientConnection> {
80        Arc::new(Quic(Arc::new(QuicClient::new(
81            self.endpoint.clone(),
82            *addr,
83        ))))
84    }
85}
86
87impl Drop for QuicPool {
88    fn drop(&mut self) {
89        debug!(
90            "Dropping QuicPool with {} connections",
91            self.connections.len()
92        );
93        for connection in self.connections.drain(..) {
94            // Explicitly drop each connection to ensure resources are released
95            close_quic_connection(connection.0.clone());
96        }
97    }
98}
99
100pub struct QuicConfig {
101    // Arc to prevent having to copy the struct
102    client_certificate: RwLock<Arc<QuicClientCertificate>>,
103    maybe_staked_nodes: Option<Arc<RwLock<StakedNodes>>>,
104    maybe_client_pubkey: Option<Pubkey>,
105
106    // The optional specified endpoint for the quic based client connections
107    // If not specified, the connection cache will create as needed.
108    client_endpoint: Option<Endpoint>,
109}
110
111impl Clone for QuicConfig {
112    fn clone(&self) -> Self {
113        let cert_guard = self.client_certificate.read().unwrap();
114        QuicConfig {
115            client_certificate: RwLock::new(cert_guard.clone()),
116            maybe_staked_nodes: self.maybe_staked_nodes.clone(),
117            maybe_client_pubkey: self.maybe_client_pubkey,
118            client_endpoint: self.client_endpoint.clone(),
119        }
120    }
121}
122
123impl NewConnectionConfig for QuicConfig {
124    fn new() -> Result<Self, ClientError> {
125        let (cert, priv_key) = new_dummy_x509_certificate(&Keypair::new());
126        Ok(Self {
127            client_certificate: RwLock::new(Arc::new(QuicClientCertificate {
128                certificate: cert,
129                key: priv_key,
130            })),
131            maybe_staked_nodes: None,
132            maybe_client_pubkey: None,
133            client_endpoint: None,
134        })
135    }
136}
137
138impl QuicConfig {
139    fn create_endpoint(&self) -> QuicLazyInitializedEndpoint {
140        let cert_guard = self.client_certificate.read().unwrap();
141        QuicLazyInitializedEndpoint::new(cert_guard.clone(), self.client_endpoint.as_ref().cloned())
142    }
143
144    pub fn update_client_certificate(&mut self, keypair: &Keypair, _ipaddr: IpAddr) {
145        let (cert, priv_key) = new_dummy_x509_certificate(keypair);
146
147        let mut cert_guard = self.client_certificate.write().unwrap();
148
149        *cert_guard = Arc::new(QuicClientCertificate {
150            certificate: cert,
151            key: priv_key,
152        });
153    }
154
155    pub fn update_keypair(&self, keypair: &Keypair) {
156        let (cert, priv_key) = new_dummy_x509_certificate(keypair);
157
158        let mut cert_guard = self.client_certificate.write().unwrap();
159
160        *cert_guard = Arc::new(QuicClientCertificate {
161            certificate: cert,
162            key: priv_key,
163        });
164    }
165
166    pub fn set_staked_nodes(
167        &mut self,
168        staked_nodes: &Arc<RwLock<StakedNodes>>,
169        client_pubkey: &Pubkey,
170    ) {
171        self.maybe_staked_nodes = Some(staked_nodes.clone());
172        self.maybe_client_pubkey = Some(*client_pubkey);
173    }
174
175    pub fn update_client_endpoint(&mut self, client_socket: UdpSocket) {
176        let runtime = get_runtime();
177        let _guard = runtime.enter();
178        let config = EndpointConfig::default();
179        self.client_endpoint = Some(
180            quinn::Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime))
181                .expect("QuicNewConnection::create_endpoint quinn::Endpoint::new"),
182        );
183    }
184}
185
186pub struct Quic(Arc<QuicClient>);
187impl BaseClientConnection for Quic {
188    type BlockingClientConnection = BlockingQuicClientConnection;
189    type NonblockingClientConnection = NonblockingQuicClientConnection;
190
191    fn new_blocking_connection(
192        &self,
193        _addr: SocketAddr,
194        stats: Arc<ConnectionCacheStats>,
195    ) -> Arc<Self::BlockingClientConnection> {
196        Arc::new(BlockingQuicClientConnection::new_with_client(
197            self.0.clone(),
198            stats,
199        ))
200    }
201
202    fn new_nonblocking_connection(
203        &self,
204        _addr: SocketAddr,
205        stats: Arc<ConnectionCacheStats>,
206    ) -> Arc<Self::NonblockingClientConnection> {
207        Arc::new(NonblockingQuicClientConnection::new_with_client(
208            self.0.clone(),
209            stats,
210        ))
211    }
212}
213
214pub struct QuicConnectionManager {
215    connection_config: QuicConfig,
216}
217
218impl ConnectionManager for QuicConnectionManager {
219    type ConnectionPool = QuicPool;
220    type NewConnectionConfig = QuicConfig;
221
222    const PROTOCOL: Protocol = Protocol::QUIC;
223
224    fn new_connection_pool(&self) -> Self::ConnectionPool {
225        QuicPool {
226            connections: Vec::default(),
227            endpoint: Arc::new(self.connection_config.create_endpoint()),
228        }
229    }
230
231    fn new_connection_config(&self) -> QuicConfig {
232        self.connection_config.clone()
233    }
234
235    fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
236        self.connection_config.update_keypair(key);
237        Ok(())
238    }
239}
240
241impl QuicConnectionManager {
242    pub fn new_with_connection_config(connection_config: QuicConfig) -> Self {
243        Self { connection_config }
244    }
245}
246
247pub type QuicConnectionCache = ConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>;
248
249pub fn new_quic_connection_cache(
250    name: &'static str,
251    keypair: &Keypair,
252    ipaddr: IpAddr,
253    staked_nodes: &Arc<RwLock<StakedNodes>>,
254    connection_pool_size: usize,
255) -> Result<QuicConnectionCache, ClientError> {
256    let mut config = QuicConfig::new()?;
257    config.update_client_certificate(keypair, ipaddr);
258    config.set_staked_nodes(staked_nodes, &keypair.pubkey());
259    let connection_manager = QuicConnectionManager::new_with_connection_config(config);
260    ConnectionCache::new(name, connection_manager, connection_pool_size)
261}