1#![cfg_attr(
2 not(feature = "agave-unstable-api"),
3 deprecated(
4 since = "3.1.0",
5 note = "This crate has been marked for formal inclusion in the Agave Unstable API. From \
6 v4.0.0 onward, the `agave-unstable-api` crate feature must be specified to \
7 acknowledge use of an interface that may break without warning."
8 )
9)]
10#![allow(clippy::arithmetic_side_effects)]
11
12pub mod nonblocking;
13pub mod quic_client;
14
15#[macro_use]
16extern crate solana_metrics;
17
18use {
19 crate::{
20 nonblocking::quic_client::{
21 QuicClient, QuicClientConnection as NonblockingQuicClientConnection,
22 QuicLazyInitializedEndpoint,
23 },
24 quic_client::{
25 close_quic_connection, QuicClientConnection as BlockingQuicClientConnection,
26 },
27 },
28 log::debug,
29 quic_client::get_runtime,
30 quinn::{Endpoint, EndpointConfig, TokioRuntime},
31 solana_connection_cache::{
32 connection_cache::{
33 BaseClientConnection, ClientError, ConnectionCache, ConnectionManager, ConnectionPool,
34 ConnectionPoolError, NewConnectionConfig, Protocol,
35 },
36 connection_cache_stats::ConnectionCacheStats,
37 },
38 solana_keypair::Keypair,
39 solana_pubkey::Pubkey,
40 solana_signer::Signer,
41 solana_streamer::streamer::StakedNodes,
42 solana_tls_utils::{new_dummy_x509_certificate, QuicClientCertificate},
43 std::{
44 net::{IpAddr, SocketAddr, UdpSocket},
45 sync::{Arc, RwLock},
46 },
47};
48
49pub struct QuicPool {
50 connections: Vec<Arc<Quic>>,
51 endpoint: Arc<QuicLazyInitializedEndpoint>,
52}
53impl ConnectionPool for QuicPool {
54 type BaseClientConnection = Quic;
55 type NewConnectionConfig = QuicConfig;
56
57 fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) -> usize {
58 let connection = self.create_pool_entry(config, addr);
59 let idx = self.connections.len();
60 self.connections.push(connection);
61 idx
62 }
63
64 fn num_connections(&self) -> usize {
65 self.connections.len()
66 }
67
68 fn get(&self, index: usize) -> Result<Arc<Self::BaseClientConnection>, ConnectionPoolError> {
69 self.connections
70 .get(index)
71 .cloned()
72 .ok_or(ConnectionPoolError::IndexOutOfRange)
73 }
74
75 fn create_pool_entry(
76 &self,
77 _config: &Self::NewConnectionConfig,
78 addr: &SocketAddr,
79 ) -> Arc<Self::BaseClientConnection> {
80 Arc::new(Quic(Arc::new(QuicClient::new(
81 self.endpoint.clone(),
82 *addr,
83 ))))
84 }
85}
86
87impl Drop for QuicPool {
88 fn drop(&mut self) {
89 debug!(
90 "Dropping QuicPool with {} connections",
91 self.connections.len()
92 );
93 for connection in self.connections.drain(..) {
94 close_quic_connection(connection.0.clone());
96 }
97 }
98}
99
100pub struct QuicConfig {
101 client_certificate: RwLock<Arc<QuicClientCertificate>>,
103 maybe_staked_nodes: Option<Arc<RwLock<StakedNodes>>>,
104 maybe_client_pubkey: Option<Pubkey>,
105
106 client_endpoint: Option<Endpoint>,
109}
110
111impl Clone for QuicConfig {
112 fn clone(&self) -> Self {
113 let cert_guard = self.client_certificate.read().unwrap();
114 QuicConfig {
115 client_certificate: RwLock::new(cert_guard.clone()),
116 maybe_staked_nodes: self.maybe_staked_nodes.clone(),
117 maybe_client_pubkey: self.maybe_client_pubkey,
118 client_endpoint: self.client_endpoint.clone(),
119 }
120 }
121}
122
123impl NewConnectionConfig for QuicConfig {
124 fn new() -> Result<Self, ClientError> {
125 let (cert, priv_key) = new_dummy_x509_certificate(&Keypair::new());
126 Ok(Self {
127 client_certificate: RwLock::new(Arc::new(QuicClientCertificate {
128 certificate: cert,
129 key: priv_key,
130 })),
131 maybe_staked_nodes: None,
132 maybe_client_pubkey: None,
133 client_endpoint: None,
134 })
135 }
136}
137
138impl QuicConfig {
139 fn create_endpoint(&self) -> QuicLazyInitializedEndpoint {
140 let cert_guard = self.client_certificate.read().unwrap();
141 QuicLazyInitializedEndpoint::new(cert_guard.clone(), self.client_endpoint.as_ref().cloned())
142 }
143
144 pub fn update_client_certificate(&mut self, keypair: &Keypair, _ipaddr: IpAddr) {
145 let (cert, priv_key) = new_dummy_x509_certificate(keypair);
146
147 let mut cert_guard = self.client_certificate.write().unwrap();
148
149 *cert_guard = Arc::new(QuicClientCertificate {
150 certificate: cert,
151 key: priv_key,
152 });
153 }
154
155 pub fn update_keypair(&self, keypair: &Keypair) {
156 let (cert, priv_key) = new_dummy_x509_certificate(keypair);
157
158 let mut cert_guard = self.client_certificate.write().unwrap();
159
160 *cert_guard = Arc::new(QuicClientCertificate {
161 certificate: cert,
162 key: priv_key,
163 });
164 }
165
166 pub fn set_staked_nodes(
167 &mut self,
168 staked_nodes: &Arc<RwLock<StakedNodes>>,
169 client_pubkey: &Pubkey,
170 ) {
171 self.maybe_staked_nodes = Some(staked_nodes.clone());
172 self.maybe_client_pubkey = Some(*client_pubkey);
173 }
174
175 pub fn update_client_endpoint(&mut self, client_socket: UdpSocket) {
176 let runtime = get_runtime();
177 let _guard = runtime.enter();
178 let config = EndpointConfig::default();
179 self.client_endpoint = Some(
180 quinn::Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime))
181 .expect("QuicNewConnection::create_endpoint quinn::Endpoint::new"),
182 );
183 }
184}
185
186pub struct Quic(Arc<QuicClient>);
187impl BaseClientConnection for Quic {
188 type BlockingClientConnection = BlockingQuicClientConnection;
189 type NonblockingClientConnection = NonblockingQuicClientConnection;
190
191 fn new_blocking_connection(
192 &self,
193 _addr: SocketAddr,
194 stats: Arc<ConnectionCacheStats>,
195 ) -> Arc<Self::BlockingClientConnection> {
196 Arc::new(BlockingQuicClientConnection::new_with_client(
197 self.0.clone(),
198 stats,
199 ))
200 }
201
202 fn new_nonblocking_connection(
203 &self,
204 _addr: SocketAddr,
205 stats: Arc<ConnectionCacheStats>,
206 ) -> Arc<Self::NonblockingClientConnection> {
207 Arc::new(NonblockingQuicClientConnection::new_with_client(
208 self.0.clone(),
209 stats,
210 ))
211 }
212}
213
214pub struct QuicConnectionManager {
215 connection_config: QuicConfig,
216}
217
218impl ConnectionManager for QuicConnectionManager {
219 type ConnectionPool = QuicPool;
220 type NewConnectionConfig = QuicConfig;
221
222 const PROTOCOL: Protocol = Protocol::QUIC;
223
224 fn new_connection_pool(&self) -> Self::ConnectionPool {
225 QuicPool {
226 connections: Vec::default(),
227 endpoint: Arc::new(self.connection_config.create_endpoint()),
228 }
229 }
230
231 fn new_connection_config(&self) -> QuicConfig {
232 self.connection_config.clone()
233 }
234
235 fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
236 self.connection_config.update_keypair(key);
237 Ok(())
238 }
239}
240
241impl QuicConnectionManager {
242 pub fn new_with_connection_config(connection_config: QuicConfig) -> Self {
243 Self { connection_config }
244 }
245}
246
247pub type QuicConnectionCache = ConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>;
248
249pub fn new_quic_connection_cache(
250 name: &'static str,
251 keypair: &Keypair,
252 ipaddr: IpAddr,
253 staked_nodes: &Arc<RwLock<StakedNodes>>,
254 connection_pool_size: usize,
255) -> Result<QuicConnectionCache, ClientError> {
256 let mut config = QuicConfig::new()?;
257 config.update_client_certificate(keypair, ipaddr);
258 config.set_staked_nodes(staked_nodes, &keypair.pubkey());
259 let connection_manager = QuicConnectionManager::new_with_connection_config(config);
260 ConnectionCache::new(name, connection_manager, connection_pool_size)
261}