solana_quic_client/
lib.rs

1#![allow(clippy::arithmetic_side_effects)]
2
3pub mod nonblocking;
4pub mod quic_client;
5
6#[macro_use]
7extern crate solana_metrics;
8
9use {
10    crate::{
11        nonblocking::quic_client::{
12            QuicClient, QuicClientConnection as NonblockingQuicClientConnection,
13            QuicLazyInitializedEndpoint,
14        },
15        quic_client::{
16            close_quic_connection, QuicClientConnection as BlockingQuicClientConnection,
17        },
18    },
19    log::debug,
20    quic_client::get_runtime,
21    quinn::{Endpoint, EndpointConfig, TokioRuntime},
22    solana_connection_cache::{
23        connection_cache::{
24            BaseClientConnection, ClientError, ConnectionCache, ConnectionManager, ConnectionPool,
25            ConnectionPoolError, NewConnectionConfig, Protocol,
26        },
27        connection_cache_stats::ConnectionCacheStats,
28    },
29    solana_keypair::Keypair,
30    solana_pubkey::Pubkey,
31    solana_signer::Signer,
32    solana_streamer::streamer::StakedNodes,
33    solana_tls_utils::{new_dummy_x509_certificate, QuicClientCertificate},
34    std::{
35        net::{IpAddr, SocketAddr, UdpSocket},
36        sync::{Arc, RwLock},
37    },
38};
39
40pub struct QuicPool {
41    connections: Vec<Arc<Quic>>,
42    endpoint: Arc<QuicLazyInitializedEndpoint>,
43}
44impl ConnectionPool for QuicPool {
45    type BaseClientConnection = Quic;
46    type NewConnectionConfig = QuicConfig;
47
48    fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) -> usize {
49        let connection = self.create_pool_entry(config, addr);
50        let idx = self.connections.len();
51        self.connections.push(connection);
52        idx
53    }
54
55    fn num_connections(&self) -> usize {
56        self.connections.len()
57    }
58
59    fn get(&self, index: usize) -> Result<Arc<Self::BaseClientConnection>, ConnectionPoolError> {
60        self.connections
61            .get(index)
62            .cloned()
63            .ok_or(ConnectionPoolError::IndexOutOfRange)
64    }
65
66    fn create_pool_entry(
67        &self,
68        _config: &Self::NewConnectionConfig,
69        addr: &SocketAddr,
70    ) -> Arc<Self::BaseClientConnection> {
71        Arc::new(Quic(Arc::new(QuicClient::new(
72            self.endpoint.clone(),
73            *addr,
74        ))))
75    }
76}
77
78impl Drop for QuicPool {
79    fn drop(&mut self) {
80        debug!(
81            "Dropping QuicPool with {} connections",
82            self.connections.len()
83        );
84        for connection in self.connections.drain(..) {
85            // Explicitly drop each connection to ensure resources are released
86            close_quic_connection(connection.0.clone());
87        }
88    }
89}
90
91pub struct QuicConfig {
92    // Arc to prevent having to copy the struct
93    client_certificate: RwLock<Arc<QuicClientCertificate>>,
94    maybe_staked_nodes: Option<Arc<RwLock<StakedNodes>>>,
95    maybe_client_pubkey: Option<Pubkey>,
96
97    // The optional specified endpoint for the quic based client connections
98    // If not specified, the connection cache will create as needed.
99    client_endpoint: Option<Endpoint>,
100}
101
102impl Clone for QuicConfig {
103    fn clone(&self) -> Self {
104        let cert_guard = self.client_certificate.read().unwrap();
105        QuicConfig {
106            client_certificate: RwLock::new(cert_guard.clone()),
107            maybe_staked_nodes: self.maybe_staked_nodes.clone(),
108            maybe_client_pubkey: self.maybe_client_pubkey,
109            client_endpoint: self.client_endpoint.clone(),
110        }
111    }
112}
113
114impl NewConnectionConfig for QuicConfig {
115    fn new() -> Result<Self, ClientError> {
116        let (cert, priv_key) = new_dummy_x509_certificate(&Keypair::new());
117        Ok(Self {
118            client_certificate: RwLock::new(Arc::new(QuicClientCertificate {
119                certificate: cert,
120                key: priv_key,
121            })),
122            maybe_staked_nodes: None,
123            maybe_client_pubkey: None,
124            client_endpoint: None,
125        })
126    }
127}
128
129impl QuicConfig {
130    fn create_endpoint(&self) -> QuicLazyInitializedEndpoint {
131        let cert_guard = self.client_certificate.read().unwrap();
132        QuicLazyInitializedEndpoint::new(cert_guard.clone(), self.client_endpoint.as_ref().cloned())
133    }
134
135    pub fn update_client_certificate(&mut self, keypair: &Keypair, _ipaddr: IpAddr) {
136        let (cert, priv_key) = new_dummy_x509_certificate(keypair);
137
138        let mut cert_guard = self.client_certificate.write().unwrap();
139
140        *cert_guard = Arc::new(QuicClientCertificate {
141            certificate: cert,
142            key: priv_key,
143        });
144    }
145
146    pub fn update_keypair(&self, keypair: &Keypair) {
147        let (cert, priv_key) = new_dummy_x509_certificate(keypair);
148
149        let mut cert_guard = self.client_certificate.write().unwrap();
150
151        *cert_guard = Arc::new(QuicClientCertificate {
152            certificate: cert,
153            key: priv_key,
154        });
155    }
156
157    pub fn set_staked_nodes(
158        &mut self,
159        staked_nodes: &Arc<RwLock<StakedNodes>>,
160        client_pubkey: &Pubkey,
161    ) {
162        self.maybe_staked_nodes = Some(staked_nodes.clone());
163        self.maybe_client_pubkey = Some(*client_pubkey);
164    }
165
166    pub fn update_client_endpoint(&mut self, client_socket: UdpSocket) {
167        let runtime = get_runtime();
168        let _guard = runtime.enter();
169        let config = EndpointConfig::default();
170        self.client_endpoint = Some(
171            quinn::Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime))
172                .expect("QuicNewConnection::create_endpoint quinn::Endpoint::new"),
173        );
174    }
175}
176
177pub struct Quic(Arc<QuicClient>);
178impl BaseClientConnection for Quic {
179    type BlockingClientConnection = BlockingQuicClientConnection;
180    type NonblockingClientConnection = NonblockingQuicClientConnection;
181
182    fn new_blocking_connection(
183        &self,
184        _addr: SocketAddr,
185        stats: Arc<ConnectionCacheStats>,
186    ) -> Arc<Self::BlockingClientConnection> {
187        Arc::new(BlockingQuicClientConnection::new_with_client(
188            self.0.clone(),
189            stats,
190        ))
191    }
192
193    fn new_nonblocking_connection(
194        &self,
195        _addr: SocketAddr,
196        stats: Arc<ConnectionCacheStats>,
197    ) -> Arc<Self::NonblockingClientConnection> {
198        Arc::new(NonblockingQuicClientConnection::new_with_client(
199            self.0.clone(),
200            stats,
201        ))
202    }
203}
204
205pub struct QuicConnectionManager {
206    connection_config: QuicConfig,
207}
208
209impl ConnectionManager for QuicConnectionManager {
210    type ConnectionPool = QuicPool;
211    type NewConnectionConfig = QuicConfig;
212
213    const PROTOCOL: Protocol = Protocol::QUIC;
214
215    fn new_connection_pool(&self) -> Self::ConnectionPool {
216        QuicPool {
217            connections: Vec::default(),
218            endpoint: Arc::new(self.connection_config.create_endpoint()),
219        }
220    }
221
222    fn new_connection_config(&self) -> QuicConfig {
223        self.connection_config.clone()
224    }
225
226    fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
227        self.connection_config.update_keypair(key);
228        Ok(())
229    }
230}
231
232impl QuicConnectionManager {
233    pub fn new_with_connection_config(connection_config: QuicConfig) -> Self {
234        Self { connection_config }
235    }
236}
237
238pub type QuicConnectionCache = ConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>;
239
240pub fn new_quic_connection_cache(
241    name: &'static str,
242    keypair: &Keypair,
243    ipaddr: IpAddr,
244    staked_nodes: &Arc<RwLock<StakedNodes>>,
245    connection_pool_size: usize,
246) -> Result<QuicConnectionCache, ClientError> {
247    let mut config = QuicConfig::new()?;
248    config.update_client_certificate(keypair, ipaddr);
249    config.set_staked_nodes(staked_nodes, &keypair.pubkey());
250    let connection_manager = QuicConnectionManager::new_with_connection_config(config);
251    ConnectionCache::new(name, connection_manager, connection_pool_size)
252}