solana_quic_client/
quic_client.rs1use {
5 crate::nonblocking::quic_client::{
6 QuicClient, QuicClientConnection as NonblockingQuicConnection, QuicLazyInitializedEndpoint,
7 },
8 log::*,
9 solana_connection_cache::{
10 client_connection::{ClientConnection, ClientStats},
11 connection_cache_stats::ConnectionCacheStats,
12 nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
13 },
14 solana_transaction_error::{TransportError, TransportResult},
15 std::{
16 net::SocketAddr,
17 sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard},
18 time::Duration,
19 },
20 tokio::{runtime::Runtime, time::timeout},
21};
22
23pub const MAX_OUTSTANDING_TASK: u64 = 2000;
24const SEND_DATA_TIMEOUT: Duration = Duration::from_secs(10);
25
26struct AsyncTaskSemaphore {
30 counter: Mutex<u64>,
32 cond_var: Condvar,
34 permits: u64,
36}
37
38impl AsyncTaskSemaphore {
39 pub fn new(permits: u64) -> Self {
40 Self {
41 counter: Mutex::new(0),
42 cond_var: Condvar::new(),
43 permits,
44 }
45 }
46
47 pub fn acquire(&self) -> MutexGuard<u64> {
51 let mut count = self.counter.lock().unwrap();
52 *count += 1;
53 while *count > self.permits {
54 count = self.cond_var.wait(count).unwrap();
55 }
56 count
57 }
58
59 pub fn release(&self) {
61 let mut count = self.counter.lock().unwrap();
62 *count -= 1;
63 self.cond_var.notify_one();
64 }
65}
66
67static ASYNC_TASK_SEMAPHORE: std::sync::LazyLock<AsyncTaskSemaphore> =
68 std::sync::LazyLock::new(|| AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK));
69static RUNTIME: std::sync::LazyLock<Runtime> = std::sync::LazyLock::new(|| {
70 tokio::runtime::Builder::new_multi_thread()
71 .thread_name("solQuicClientRt")
72 .enable_all()
73 .build()
74 .unwrap()
75});
76
77pub fn get_runtime() -> &'static Runtime {
78 &RUNTIME
79}
80
81async fn send_data_async(
82 connection: Arc<NonblockingQuicConnection>,
83 buffer: Vec<u8>,
84) -> TransportResult<()> {
85 let result = timeout(SEND_DATA_TIMEOUT, connection.send_data(&buffer)).await;
86 ASYNC_TASK_SEMAPHORE.release();
87 handle_send_result(result, connection)
88}
89
90async fn send_data_batch_async(
91 connection: Arc<NonblockingQuicConnection>,
92 buffers: Vec<Vec<u8>>,
93) -> TransportResult<()> {
94 let result = timeout(
95 u32::try_from(buffers.len())
96 .map(|size| SEND_DATA_TIMEOUT.saturating_mul(size))
97 .unwrap_or(Duration::MAX),
98 connection.send_data_batch(&buffers),
99 )
100 .await;
101 ASYNC_TASK_SEMAPHORE.release();
102 handle_send_result(result, connection)
103}
104
105fn handle_send_result(
107 result: Result<Result<(), TransportError>, tokio::time::error::Elapsed>,
108 connection: Arc<NonblockingQuicConnection>,
109) -> Result<(), TransportError> {
110 match result {
111 Ok(result) => result,
112 Err(_err) => {
113 let client_stats = ClientStats::default();
114 client_stats.send_timeout.fetch_add(1, Ordering::Relaxed);
115 let stats = connection.connection_stats();
116 stats.add_client_stats(&client_stats, 0, false);
117 info!("Timedout sending data {:?}", connection.server_addr());
118 Err(TransportError::Custom("Timedout sending data".to_string()))
119 }
120 }
121}
122
123pub struct QuicClientConnection {
124 pub inner: Arc<NonblockingQuicConnection>,
125}
126
127impl QuicClientConnection {
128 pub fn new(
129 endpoint: Arc<QuicLazyInitializedEndpoint>,
130 server_addr: SocketAddr,
131 connection_stats: Arc<ConnectionCacheStats>,
132 ) -> Self {
133 let inner = Arc::new(NonblockingQuicConnection::new(
134 endpoint,
135 server_addr,
136 connection_stats,
137 ));
138 Self { inner }
139 }
140
141 pub fn new_with_client(
142 client: Arc<QuicClient>,
143 connection_stats: Arc<ConnectionCacheStats>,
144 ) -> Self {
145 let inner = Arc::new(NonblockingQuicConnection::new_with_client(
146 client,
147 connection_stats,
148 ));
149 Self { inner }
150 }
151}
152
153impl ClientConnection for QuicClientConnection {
154 fn server_addr(&self) -> &SocketAddr {
155 self.inner.server_addr()
156 }
157
158 fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
159 RUNTIME.block_on(self.inner.send_data_batch(buffers))?;
160 Ok(())
161 }
162
163 fn send_data_async(&self, data: Vec<u8>) -> TransportResult<()> {
164 let _lock = ASYNC_TASK_SEMAPHORE.acquire();
165 let inner = self.inner.clone();
166
167 let _handle = RUNTIME.spawn(send_data_async(inner, data));
168 Ok(())
169 }
170
171 fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
172 let _lock = ASYNC_TASK_SEMAPHORE.acquire();
173 let inner = self.inner.clone();
174 let _handle = RUNTIME.spawn(send_data_batch_async(inner, buffers));
175 Ok(())
176 }
177
178 fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
179 RUNTIME.block_on(self.inner.send_data(buffer))?;
180 Ok(())
181 }
182}
183
184pub(crate) fn close_quic_connection(connection: Arc<QuicClient>) {
185 trace!("Closing QUIC connection to {}", connection.server_addr());
187 RUNTIME.block_on(connection.close());
188}