1#![allow(clippy::arithmetic_side_effects)]
2
3pub mod nonblocking;
4pub mod quic_client;
5
6#[macro_use]
7extern crate solana_metrics;
8
9use {
10 crate::{
11 nonblocking::quic_client::{
12 QuicClient, QuicClientConnection as NonblockingQuicClientConnection,
13 QuicLazyInitializedEndpoint,
14 },
15 quic_client::{
16 close_quic_connection, QuicClientConnection as BlockingQuicClientConnection,
17 },
18 },
19 log::debug,
20 quic_client::get_runtime,
21 quinn::{Endpoint, EndpointConfig, TokioRuntime},
22 solana_connection_cache::{
23 connection_cache::{
24 BaseClientConnection, ClientError, ConnectionCache, ConnectionManager, ConnectionPool,
25 ConnectionPoolError, NewConnectionConfig, Protocol,
26 },
27 connection_cache_stats::ConnectionCacheStats,
28 },
29 solana_keypair::Keypair,
30 solana_pubkey::Pubkey,
31 solana_signer::Signer,
32 solana_streamer::streamer::StakedNodes,
33 solana_tls_utils::{new_dummy_x509_certificate, QuicClientCertificate},
34 std::{
35 net::{IpAddr, SocketAddr, UdpSocket},
36 sync::{Arc, RwLock},
37 },
38};
39
40pub struct QuicPool {
41 connections: Vec<Arc<Quic>>,
42 endpoint: Arc<QuicLazyInitializedEndpoint>,
43}
44impl ConnectionPool for QuicPool {
45 type BaseClientConnection = Quic;
46 type NewConnectionConfig = QuicConfig;
47
48 fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) -> usize {
49 let connection = self.create_pool_entry(config, addr);
50 let idx = self.connections.len();
51 self.connections.push(connection);
52 idx
53 }
54
55 fn num_connections(&self) -> usize {
56 self.connections.len()
57 }
58
59 fn get(&self, index: usize) -> Result<Arc<Self::BaseClientConnection>, ConnectionPoolError> {
60 self.connections
61 .get(index)
62 .cloned()
63 .ok_or(ConnectionPoolError::IndexOutOfRange)
64 }
65
66 fn create_pool_entry(
67 &self,
68 _config: &Self::NewConnectionConfig,
69 addr: &SocketAddr,
70 ) -> Arc<Self::BaseClientConnection> {
71 Arc::new(Quic(Arc::new(QuicClient::new(
72 self.endpoint.clone(),
73 *addr,
74 ))))
75 }
76}
77
78impl Drop for QuicPool {
79 fn drop(&mut self) {
80 debug!(
81 "Dropping QuicPool with {} connections",
82 self.connections.len()
83 );
84 for connection in self.connections.drain(..) {
85 close_quic_connection(connection.0.clone());
87 }
88 }
89}
90
91pub struct QuicConfig {
92 client_certificate: RwLock<Arc<QuicClientCertificate>>,
94 maybe_staked_nodes: Option<Arc<RwLock<StakedNodes>>>,
95 maybe_client_pubkey: Option<Pubkey>,
96
97 client_endpoint: Option<Endpoint>,
100}
101
102impl Clone for QuicConfig {
103 fn clone(&self) -> Self {
104 let cert_guard = self.client_certificate.read().unwrap();
105 QuicConfig {
106 client_certificate: RwLock::new(cert_guard.clone()),
107 maybe_staked_nodes: self.maybe_staked_nodes.clone(),
108 maybe_client_pubkey: self.maybe_client_pubkey,
109 client_endpoint: self.client_endpoint.clone(),
110 }
111 }
112}
113
114impl NewConnectionConfig for QuicConfig {
115 fn new() -> Result<Self, ClientError> {
116 let (cert, priv_key) = new_dummy_x509_certificate(&Keypair::new());
117 Ok(Self {
118 client_certificate: RwLock::new(Arc::new(QuicClientCertificate {
119 certificate: cert,
120 key: priv_key,
121 })),
122 maybe_staked_nodes: None,
123 maybe_client_pubkey: None,
124 client_endpoint: None,
125 })
126 }
127}
128
129impl QuicConfig {
130 fn create_endpoint(&self) -> QuicLazyInitializedEndpoint {
131 let cert_guard = self.client_certificate.read().unwrap();
132 QuicLazyInitializedEndpoint::new(cert_guard.clone(), self.client_endpoint.as_ref().cloned())
133 }
134
135 pub fn update_client_certificate(&mut self, keypair: &Keypair, _ipaddr: IpAddr) {
136 let (cert, priv_key) = new_dummy_x509_certificate(keypair);
137
138 let mut cert_guard = self.client_certificate.write().unwrap();
139
140 *cert_guard = Arc::new(QuicClientCertificate {
141 certificate: cert,
142 key: priv_key,
143 });
144 }
145
146 pub fn update_keypair(&self, keypair: &Keypair) {
147 let (cert, priv_key) = new_dummy_x509_certificate(keypair);
148
149 let mut cert_guard = self.client_certificate.write().unwrap();
150
151 *cert_guard = Arc::new(QuicClientCertificate {
152 certificate: cert,
153 key: priv_key,
154 });
155 }
156
157 pub fn set_staked_nodes(
158 &mut self,
159 staked_nodes: &Arc<RwLock<StakedNodes>>,
160 client_pubkey: &Pubkey,
161 ) {
162 self.maybe_staked_nodes = Some(staked_nodes.clone());
163 self.maybe_client_pubkey = Some(*client_pubkey);
164 }
165
166 pub fn update_client_endpoint(&mut self, client_socket: UdpSocket) {
167 let runtime = get_runtime();
168 let _guard = runtime.enter();
169 let config = EndpointConfig::default();
170 self.client_endpoint = Some(
171 quinn::Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime))
172 .expect("QuicNewConnection::create_endpoint quinn::Endpoint::new"),
173 );
174 }
175}
176
177pub struct Quic(Arc<QuicClient>);
178impl BaseClientConnection for Quic {
179 type BlockingClientConnection = BlockingQuicClientConnection;
180 type NonblockingClientConnection = NonblockingQuicClientConnection;
181
182 fn new_blocking_connection(
183 &self,
184 _addr: SocketAddr,
185 stats: Arc<ConnectionCacheStats>,
186 ) -> Arc<Self::BlockingClientConnection> {
187 Arc::new(BlockingQuicClientConnection::new_with_client(
188 self.0.clone(),
189 stats,
190 ))
191 }
192
193 fn new_nonblocking_connection(
194 &self,
195 _addr: SocketAddr,
196 stats: Arc<ConnectionCacheStats>,
197 ) -> Arc<Self::NonblockingClientConnection> {
198 Arc::new(NonblockingQuicClientConnection::new_with_client(
199 self.0.clone(),
200 stats,
201 ))
202 }
203}
204
205pub struct QuicConnectionManager {
206 connection_config: QuicConfig,
207}
208
209impl ConnectionManager for QuicConnectionManager {
210 type ConnectionPool = QuicPool;
211 type NewConnectionConfig = QuicConfig;
212
213 const PROTOCOL: Protocol = Protocol::QUIC;
214
215 fn new_connection_pool(&self) -> Self::ConnectionPool {
216 QuicPool {
217 connections: Vec::default(),
218 endpoint: Arc::new(self.connection_config.create_endpoint()),
219 }
220 }
221
222 fn new_connection_config(&self) -> QuicConfig {
223 self.connection_config.clone()
224 }
225
226 fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
227 self.connection_config.update_keypair(key);
228 Ok(())
229 }
230}
231
232impl QuicConnectionManager {
233 pub fn new_with_connection_config(connection_config: QuicConfig) -> Self {
234 Self { connection_config }
235 }
236}
237
238pub type QuicConnectionCache = ConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>;
239
240pub fn new_quic_connection_cache(
241 name: &'static str,
242 keypair: &Keypair,
243 ipaddr: IpAddr,
244 staked_nodes: &Arc<RwLock<StakedNodes>>,
245 connection_pool_size: usize,
246) -> Result<QuicConnectionCache, ClientError> {
247 let mut config = QuicConfig::new()?;
248 config.update_client_certificate(keypair, ipaddr);
249 config.set_staked_nodes(staked_nodes, &keypair.pubkey());
250 let connection_manager = QuicConnectionManager::new_with_connection_config(config);
251 ConnectionCache::new(name, connection_manager, connection_pool_size)
252}