solana_quic_client/
quic_client.rs

1//! Simple client that connects to a given UDP port with the QUIC protocol and provides
2//! an interface for sending data which is restricted by the server's flow control.
3
4use {
5    crate::nonblocking::quic_client::{
6        QuicClient, QuicClientConnection as NonblockingQuicConnection, QuicLazyInitializedEndpoint,
7    },
8    log::*,
9    solana_connection_cache::{
10        client_connection::{ClientConnection, ClientStats},
11        connection_cache_stats::ConnectionCacheStats,
12        nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
13    },
14    solana_transaction_error::{TransportError, TransportResult},
15    std::{
16        net::SocketAddr,
17        sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard},
18        time::Duration,
19    },
20    tokio::{runtime::Runtime, time::timeout},
21};
22
23pub const MAX_OUTSTANDING_TASK: u64 = 2000;
24const SEND_DATA_TIMEOUT: Duration = Duration::from_secs(10);
25
26/// A semaphore used for limiting the number of asynchronous tasks spawn to the
27/// runtime. Before spawnning a task, use acquire. After the task is done (be it
28/// success or failure), call release.
29struct AsyncTaskSemaphore {
30    /// Keep the counter info about the usage
31    counter: Mutex<u64>,
32    /// Conditional variable for signaling when counter is decremented
33    cond_var: Condvar,
34    /// The maximum usage allowed by this semaphore.
35    permits: u64,
36}
37
38impl AsyncTaskSemaphore {
39    pub fn new(permits: u64) -> Self {
40        Self {
41            counter: Mutex::new(0),
42            cond_var: Condvar::new(),
43            permits,
44        }
45    }
46
47    /// When returned, the lock has been locked and usage count has been
48    /// incremented. When the returned MutexGuard is dropped the lock is dropped
49    /// without decrementing the usage count.
50    pub fn acquire(&self) -> MutexGuard<u64> {
51        let mut count = self.counter.lock().unwrap();
52        *count += 1;
53        while *count > self.permits {
54            count = self.cond_var.wait(count).unwrap();
55        }
56        count
57    }
58
59    /// Acquire the lock and decrement the usage count
60    pub fn release(&self) {
61        let mut count = self.counter.lock().unwrap();
62        *count -= 1;
63        self.cond_var.notify_one();
64    }
65}
66
67static ASYNC_TASK_SEMAPHORE: std::sync::LazyLock<AsyncTaskSemaphore> =
68    std::sync::LazyLock::new(|| AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK));
69static RUNTIME: std::sync::LazyLock<Runtime> = std::sync::LazyLock::new(|| {
70    tokio::runtime::Builder::new_multi_thread()
71        .thread_name("solQuicClientRt")
72        .enable_all()
73        .build()
74        .unwrap()
75});
76
77pub fn get_runtime() -> &'static Runtime {
78    &RUNTIME
79}
80
81async fn send_data_async(
82    connection: Arc<NonblockingQuicConnection>,
83    buffer: Vec<u8>,
84) -> TransportResult<()> {
85    let result = timeout(SEND_DATA_TIMEOUT, connection.send_data(&buffer)).await;
86    ASYNC_TASK_SEMAPHORE.release();
87    handle_send_result(result, connection)
88}
89
90async fn send_data_batch_async(
91    connection: Arc<NonblockingQuicConnection>,
92    buffers: Vec<Vec<u8>>,
93) -> TransportResult<()> {
94    let result = timeout(
95        u32::try_from(buffers.len())
96            .map(|size| SEND_DATA_TIMEOUT.saturating_mul(size))
97            .unwrap_or(Duration::MAX),
98        connection.send_data_batch(&buffers),
99    )
100    .await;
101    ASYNC_TASK_SEMAPHORE.release();
102    handle_send_result(result, connection)
103}
104
105/// Check the send result and update stats if timedout. Returns the checked result.
106fn handle_send_result(
107    result: Result<Result<(), TransportError>, tokio::time::error::Elapsed>,
108    connection: Arc<NonblockingQuicConnection>,
109) -> Result<(), TransportError> {
110    match result {
111        Ok(result) => result,
112        Err(_err) => {
113            let client_stats = ClientStats::default();
114            client_stats.send_timeout.fetch_add(1, Ordering::Relaxed);
115            let stats = connection.connection_stats();
116            stats.add_client_stats(&client_stats, 0, false);
117            info!("Timedout sending data {:?}", connection.server_addr());
118            Err(TransportError::Custom("Timedout sending data".to_string()))
119        }
120    }
121}
122
123pub struct QuicClientConnection {
124    pub inner: Arc<NonblockingQuicConnection>,
125}
126
127impl QuicClientConnection {
128    pub fn new(
129        endpoint: Arc<QuicLazyInitializedEndpoint>,
130        server_addr: SocketAddr,
131        connection_stats: Arc<ConnectionCacheStats>,
132    ) -> Self {
133        let inner = Arc::new(NonblockingQuicConnection::new(
134            endpoint,
135            server_addr,
136            connection_stats,
137        ));
138        Self { inner }
139    }
140
141    pub fn new_with_client(
142        client: Arc<QuicClient>,
143        connection_stats: Arc<ConnectionCacheStats>,
144    ) -> Self {
145        let inner = Arc::new(NonblockingQuicConnection::new_with_client(
146            client,
147            connection_stats,
148        ));
149        Self { inner }
150    }
151}
152
153impl ClientConnection for QuicClientConnection {
154    fn server_addr(&self) -> &SocketAddr {
155        self.inner.server_addr()
156    }
157
158    fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
159        RUNTIME.block_on(self.inner.send_data_batch(buffers))?;
160        Ok(())
161    }
162
163    fn send_data_async(&self, data: Vec<u8>) -> TransportResult<()> {
164        let _lock = ASYNC_TASK_SEMAPHORE.acquire();
165        let inner = self.inner.clone();
166
167        let _handle = RUNTIME.spawn(send_data_async(inner, data));
168        Ok(())
169    }
170
171    fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
172        let _lock = ASYNC_TASK_SEMAPHORE.acquire();
173        let inner = self.inner.clone();
174        let _handle = RUNTIME.spawn(send_data_batch_async(inner, buffers));
175        Ok(())
176    }
177
178    fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
179        RUNTIME.block_on(self.inner.send_data(buffer))?;
180        Ok(())
181    }
182}
183
184pub(crate) fn close_quic_connection(connection: Arc<QuicClient>) {
185    // Close the connection and release resources
186    trace!("Closing QUIC connection to {}", connection.server_addr());
187    RUNTIME.block_on(connection.close());
188}