solana_client/
connection_cache.rs

1pub use solana_connection_cache::connection_cache::Protocol;
2use {
3    solana_connection_cache::{
4        client_connection::ClientConnection,
5        connection_cache::{
6            BaseClientConnection, ConnectionCache as BackendConnectionCache, ConnectionPool,
7            NewConnectionConfig,
8        },
9    },
10    solana_keypair::Keypair,
11    solana_pubkey::Pubkey,
12    solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
13    solana_quic_definitions::NotifyKeyUpdate,
14    solana_streamer::streamer::StakedNodes,
15    solana_transaction_error::TransportResult,
16    solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
17    std::{
18        net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
19        sync::{Arc, RwLock},
20    },
21};
22
23const DEFAULT_CONNECTION_POOL_SIZE: usize = 4;
24const DEFAULT_CONNECTION_CACHE_USE_QUIC: bool = true;
25
26/// A thin wrapper over connection-cache/ConnectionCache to ease
27/// construction of the ConnectionCache for code dealing both with udp and quic.
28/// For the scenario only using udp or quic, use connection-cache/ConnectionCache directly.
29pub enum ConnectionCache {
30    Quic(Arc<BackendConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>>),
31    Udp(Arc<BackendConnectionCache<UdpPool, UdpConnectionManager, UdpConfig>>),
32}
33
34type QuicBaseClientConnection = <QuicPool as ConnectionPool>::BaseClientConnection;
35type UdpBaseClientConnection = <UdpPool as ConnectionPool>::BaseClientConnection;
36
37pub enum BlockingClientConnection {
38    Quic(Arc<<QuicBaseClientConnection as BaseClientConnection>::BlockingClientConnection>),
39    Udp(Arc<<UdpBaseClientConnection as BaseClientConnection>::BlockingClientConnection>),
40}
41
42pub enum NonblockingClientConnection {
43    Quic(Arc<<QuicBaseClientConnection as BaseClientConnection>::NonblockingClientConnection>),
44    Udp(Arc<<UdpBaseClientConnection as BaseClientConnection>::NonblockingClientConnection>),
45}
46
47impl NotifyKeyUpdate for ConnectionCache {
48    fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
49        match self {
50            Self::Udp(_) => Ok(()),
51            Self::Quic(backend) => backend.update_key(key),
52        }
53    }
54}
55
56impl ConnectionCache {
57    pub fn new(name: &'static str) -> Self {
58        if DEFAULT_CONNECTION_CACHE_USE_QUIC {
59            let cert_info = (&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)));
60            ConnectionCache::new_with_client_options(
61                name,
62                DEFAULT_CONNECTION_POOL_SIZE,
63                None, // client_endpoint
64                Some(cert_info),
65                None, // stake_info
66            )
67        } else {
68            ConnectionCache::with_udp(name, DEFAULT_CONNECTION_POOL_SIZE)
69        }
70    }
71
72    /// Create a quic connection_cache
73    pub fn new_quic(name: &'static str, connection_pool_size: usize) -> Self {
74        Self::new_with_client_options(name, connection_pool_size, None, None, None)
75    }
76
77    #[cfg(feature = "dev-context-only-utils")]
78    pub fn new_quic_for_tests(name: &'static str, connection_pool_size: usize) -> Self {
79        Self::new_with_client_options(
80            name,
81            connection_pool_size,
82            Some(solana_net_utils::sockets::bind_to_localhost_unique().unwrap()),
83            None,
84            None,
85        )
86    }
87
88    /// Create a quic connection_cache with more client options
89    pub fn new_with_client_options(
90        name: &'static str,
91        connection_pool_size: usize,
92        client_socket: Option<UdpSocket>,
93        cert_info: Option<(&Keypair, IpAddr)>,
94        stake_info: Option<(&Arc<RwLock<StakedNodes>>, &Pubkey)>,
95    ) -> Self {
96        // The minimum pool size is 1.
97        let connection_pool_size = 1.max(connection_pool_size);
98        let mut config = QuicConfig::new().unwrap();
99        if let Some(cert_info) = cert_info {
100            config.update_client_certificate(cert_info.0, cert_info.1);
101        }
102        if let Some(client_socket) = client_socket {
103            config.update_client_endpoint(client_socket);
104        }
105        if let Some(stake_info) = stake_info {
106            config.set_staked_nodes(stake_info.0, stake_info.1);
107        }
108        let connection_manager = QuicConnectionManager::new_with_connection_config(config);
109        let cache =
110            BackendConnectionCache::new(name, connection_manager, connection_pool_size).unwrap();
111        Self::Quic(Arc::new(cache))
112    }
113
114    #[inline]
115    pub fn protocol(&self) -> Protocol {
116        match self {
117            Self::Quic(_) => Protocol::QUIC,
118            Self::Udp(_) => Protocol::UDP,
119        }
120    }
121
122    pub fn with_udp(name: &'static str, connection_pool_size: usize) -> Self {
123        // The minimum pool size is 1.
124        let connection_pool_size = 1.max(connection_pool_size);
125        let connection_manager = UdpConnectionManager::default();
126        let cache =
127            BackendConnectionCache::new(name, connection_manager, connection_pool_size).unwrap();
128        Self::Udp(Arc::new(cache))
129    }
130
131    pub fn use_quic(&self) -> bool {
132        matches!(self, Self::Quic(_))
133    }
134
135    pub fn get_connection(&self, addr: &SocketAddr) -> BlockingClientConnection {
136        match self {
137            Self::Quic(cache) => BlockingClientConnection::Quic(cache.get_connection(addr)),
138            Self::Udp(cache) => BlockingClientConnection::Udp(cache.get_connection(addr)),
139        }
140    }
141
142    pub fn get_nonblocking_connection(&self, addr: &SocketAddr) -> NonblockingClientConnection {
143        match self {
144            Self::Quic(cache) => {
145                NonblockingClientConnection::Quic(cache.get_nonblocking_connection(addr))
146            }
147            Self::Udp(cache) => {
148                NonblockingClientConnection::Udp(cache.get_nonblocking_connection(addr))
149            }
150        }
151    }
152}
153
154macro_rules! dispatch {
155    ($(#[$meta:meta])* $vis:vis fn $name:ident$(<$($t:ident: $cons:ident + ?Sized),*>)?(&self $(, $arg:ident: $ty:ty)*) $(-> $out:ty)?) => {
156        #[inline]
157        $(#[$meta])*
158        $vis fn $name$(<$($t: $cons + ?Sized),*>)?(&self $(, $arg:$ty)*) $(-> $out)? {
159            match self {
160                Self::Quic(this) => this.$name($($arg, )*),
161                Self::Udp(this) => this.$name($($arg, )*),
162            }
163        }
164    };
165    ($(#[$meta:meta])* $vis:vis fn $name:ident$(<$($t:ident: $cons:ident + ?Sized),*>)?(&mut self $(, $arg:ident: $ty:ty)*) $(-> $out:ty)?) => {
166        #[inline]
167        $(#[$meta])*
168        $vis fn $name$(<$($t: $cons + ?Sized),*>)?(&mut self $(, $arg:$ty)*) $(-> $out)? {
169            match self {
170                Self::Quic(this) => this.$name($($arg, )*),
171                Self::Udp(this) => this.$name($($arg, )*),
172            }
173        }
174    };
175}
176
177pub(crate) use dispatch;
178
179impl ClientConnection for BlockingClientConnection {
180    dispatch!(fn server_addr(&self) -> &SocketAddr);
181    dispatch!(fn send_data(&self, buffer: &[u8]) -> TransportResult<()>);
182    dispatch!(fn send_data_async(&self, buffer: Arc<Vec<u8>>) -> TransportResult<()>);
183    dispatch!(fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()>);
184    dispatch!(fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()>);
185}
186
187#[async_trait::async_trait]
188impl solana_connection_cache::nonblocking::client_connection::ClientConnection
189    for NonblockingClientConnection
190{
191    dispatch!(fn server_addr(&self) -> &SocketAddr);
192
193    async fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
194        match self {
195            Self::Quic(cache) => Ok(cache.send_data(buffer).await?),
196            Self::Udp(cache) => Ok(cache.send_data(buffer).await?),
197        }
198    }
199
200    async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
201        match self {
202            Self::Quic(cache) => Ok(cache.send_data_batch(buffers).await?),
203            Self::Udp(cache) => Ok(cache.send_data_batch(buffers).await?),
204        }
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use {
211        super::*,
212        crate::connection_cache::ConnectionCache,
213        solana_net_utils::sockets::{bind_to, localhost_port_range_for_tests},
214        std::net::{IpAddr, Ipv4Addr, SocketAddr},
215    };
216
217    #[test]
218    fn test_connection_with_specified_client_endpoint() {
219        let port_range = localhost_port_range_for_tests();
220        let mut port_range = port_range.0..port_range.1;
221        let client_socket =
222            bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), port_range.next().unwrap()).unwrap();
223        let connection_cache = ConnectionCache::new_with_client_options(
224            "connection_cache_test",
225            1,                   // connection_pool_size
226            Some(client_socket), // client_endpoint
227            None,                // cert_info
228            None,                // stake_info
229        );
230
231        // server port 1:
232        let port1 = port_range.next().unwrap();
233        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port1);
234        let conn = connection_cache.get_connection(&addr);
235        assert_eq!(conn.server_addr().port(), port1);
236
237        // server port 2:
238        let port2 = port_range.next().unwrap();
239        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port2);
240        let conn = connection_cache.get_connection(&addr);
241        assert_eq!(conn.server_addr().port(), port2);
242    }
243}