1pub use solana_connection_cache::connection_cache::Protocol;
2use {
3 solana_connection_cache::{
4 client_connection::ClientConnection,
5 connection_cache::{
6 BaseClientConnection, ConnectionCache as BackendConnectionCache, ConnectionPool,
7 NewConnectionConfig,
8 },
9 },
10 solana_keypair::Keypair,
11 solana_pubkey::Pubkey,
12 solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
13 solana_quic_definitions::NotifyKeyUpdate,
14 solana_streamer::streamer::StakedNodes,
15 solana_transaction_error::TransportResult,
16 solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
17 std::{
18 net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
19 sync::{Arc, RwLock},
20 },
21};
22
23const DEFAULT_CONNECTION_POOL_SIZE: usize = 4;
24const DEFAULT_CONNECTION_CACHE_USE_QUIC: bool = true;
25
26pub enum ConnectionCache {
30 Quic(Arc<BackendConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>>),
31 Udp(Arc<BackendConnectionCache<UdpPool, UdpConnectionManager, UdpConfig>>),
32}
33
34type QuicBaseClientConnection = <QuicPool as ConnectionPool>::BaseClientConnection;
35type UdpBaseClientConnection = <UdpPool as ConnectionPool>::BaseClientConnection;
36
37pub enum BlockingClientConnection {
38 Quic(Arc<<QuicBaseClientConnection as BaseClientConnection>::BlockingClientConnection>),
39 Udp(Arc<<UdpBaseClientConnection as BaseClientConnection>::BlockingClientConnection>),
40}
41
42pub enum NonblockingClientConnection {
43 Quic(Arc<<QuicBaseClientConnection as BaseClientConnection>::NonblockingClientConnection>),
44 Udp(Arc<<UdpBaseClientConnection as BaseClientConnection>::NonblockingClientConnection>),
45}
46
47impl NotifyKeyUpdate for ConnectionCache {
48 fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
49 match self {
50 Self::Udp(_) => Ok(()),
51 Self::Quic(backend) => backend.update_key(key),
52 }
53 }
54}
55
56impl ConnectionCache {
57 pub fn new(name: &'static str) -> Self {
58 if DEFAULT_CONNECTION_CACHE_USE_QUIC {
59 let cert_info = (&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)));
60 ConnectionCache::new_with_client_options(
61 name,
62 DEFAULT_CONNECTION_POOL_SIZE,
63 None, Some(cert_info),
65 None, )
67 } else {
68 ConnectionCache::with_udp(name, DEFAULT_CONNECTION_POOL_SIZE)
69 }
70 }
71
72 pub fn new_quic(name: &'static str, connection_pool_size: usize) -> Self {
74 Self::new_with_client_options(name, connection_pool_size, None, None, None)
75 }
76
77 #[cfg(feature = "dev-context-only-utils")]
78 pub fn new_quic_for_tests(name: &'static str, connection_pool_size: usize) -> Self {
79 Self::new_with_client_options(
80 name,
81 connection_pool_size,
82 Some(solana_net_utils::sockets::bind_to_localhost_unique().unwrap()),
83 None,
84 None,
85 )
86 }
87
88 pub fn new_with_client_options(
90 name: &'static str,
91 connection_pool_size: usize,
92 client_socket: Option<UdpSocket>,
93 cert_info: Option<(&Keypair, IpAddr)>,
94 stake_info: Option<(&Arc<RwLock<StakedNodes>>, &Pubkey)>,
95 ) -> Self {
96 let connection_pool_size = 1.max(connection_pool_size);
98 let mut config = QuicConfig::new().unwrap();
99 if let Some(cert_info) = cert_info {
100 config.update_client_certificate(cert_info.0, cert_info.1);
101 }
102 if let Some(client_socket) = client_socket {
103 config.update_client_endpoint(client_socket);
104 }
105 if let Some(stake_info) = stake_info {
106 config.set_staked_nodes(stake_info.0, stake_info.1);
107 }
108 let connection_manager = QuicConnectionManager::new_with_connection_config(config);
109 let cache =
110 BackendConnectionCache::new(name, connection_manager, connection_pool_size).unwrap();
111 Self::Quic(Arc::new(cache))
112 }
113
114 #[inline]
115 pub fn protocol(&self) -> Protocol {
116 match self {
117 Self::Quic(_) => Protocol::QUIC,
118 Self::Udp(_) => Protocol::UDP,
119 }
120 }
121
122 pub fn with_udp(name: &'static str, connection_pool_size: usize) -> Self {
123 let connection_pool_size = 1.max(connection_pool_size);
125 let connection_manager = UdpConnectionManager::default();
126 let cache =
127 BackendConnectionCache::new(name, connection_manager, connection_pool_size).unwrap();
128 Self::Udp(Arc::new(cache))
129 }
130
131 pub fn use_quic(&self) -> bool {
132 matches!(self, Self::Quic(_))
133 }
134
135 pub fn get_connection(&self, addr: &SocketAddr) -> BlockingClientConnection {
136 match self {
137 Self::Quic(cache) => BlockingClientConnection::Quic(cache.get_connection(addr)),
138 Self::Udp(cache) => BlockingClientConnection::Udp(cache.get_connection(addr)),
139 }
140 }
141
142 pub fn get_nonblocking_connection(&self, addr: &SocketAddr) -> NonblockingClientConnection {
143 match self {
144 Self::Quic(cache) => {
145 NonblockingClientConnection::Quic(cache.get_nonblocking_connection(addr))
146 }
147 Self::Udp(cache) => {
148 NonblockingClientConnection::Udp(cache.get_nonblocking_connection(addr))
149 }
150 }
151 }
152}
153
154macro_rules! dispatch {
155 ($(#[$meta:meta])* $vis:vis fn $name:ident$(<$($t:ident: $cons:ident + ?Sized),*>)?(&self $(, $arg:ident: $ty:ty)*) $(-> $out:ty)?) => {
156 #[inline]
157 $(#[$meta])*
158 $vis fn $name$(<$($t: $cons + ?Sized),*>)?(&self $(, $arg:$ty)*) $(-> $out)? {
159 match self {
160 Self::Quic(this) => this.$name($($arg, )*),
161 Self::Udp(this) => this.$name($($arg, )*),
162 }
163 }
164 };
165 ($(#[$meta:meta])* $vis:vis fn $name:ident$(<$($t:ident: $cons:ident + ?Sized),*>)?(&mut self $(, $arg:ident: $ty:ty)*) $(-> $out:ty)?) => {
166 #[inline]
167 $(#[$meta])*
168 $vis fn $name$(<$($t: $cons + ?Sized),*>)?(&mut self $(, $arg:$ty)*) $(-> $out)? {
169 match self {
170 Self::Quic(this) => this.$name($($arg, )*),
171 Self::Udp(this) => this.$name($($arg, )*),
172 }
173 }
174 };
175}
176
177pub(crate) use dispatch;
178
179impl ClientConnection for BlockingClientConnection {
180 dispatch!(fn server_addr(&self) -> &SocketAddr);
181 dispatch!(fn send_data(&self, buffer: &[u8]) -> TransportResult<()>);
182 dispatch!(fn send_data_async(&self, buffer: Arc<Vec<u8>>) -> TransportResult<()>);
183 dispatch!(fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()>);
184 dispatch!(fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()>);
185}
186
187#[async_trait::async_trait]
188impl solana_connection_cache::nonblocking::client_connection::ClientConnection
189 for NonblockingClientConnection
190{
191 dispatch!(fn server_addr(&self) -> &SocketAddr);
192
193 async fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
194 match self {
195 Self::Quic(cache) => Ok(cache.send_data(buffer).await?),
196 Self::Udp(cache) => Ok(cache.send_data(buffer).await?),
197 }
198 }
199
200 async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
201 match self {
202 Self::Quic(cache) => Ok(cache.send_data_batch(buffers).await?),
203 Self::Udp(cache) => Ok(cache.send_data_batch(buffers).await?),
204 }
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use {
211 super::*,
212 crate::connection_cache::ConnectionCache,
213 solana_net_utils::sockets::{bind_to, localhost_port_range_for_tests},
214 std::net::{IpAddr, Ipv4Addr, SocketAddr},
215 };
216
217 #[test]
218 fn test_connection_with_specified_client_endpoint() {
219 let port_range = localhost_port_range_for_tests();
220 let mut port_range = port_range.0..port_range.1;
221 let client_socket =
222 bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), port_range.next().unwrap()).unwrap();
223 let connection_cache = ConnectionCache::new_with_client_options(
224 "connection_cache_test",
225 1, Some(client_socket), None, None, );
230
231 let port1 = port_range.next().unwrap();
233 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port1);
234 let conn = connection_cache.get_connection(&addr);
235 assert_eq!(conn.server_addr().port(), port1);
236
237 let port2 = port_range.next().unwrap();
239 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port2);
240 let conn = connection_cache.get_connection(&addr);
241 assert_eq!(conn.server_addr().port(), port2);
242 }
243}