solana_tpu_client_next/connection_workers_scheduler.rs
1//! This module defines [`ConnectionWorkersScheduler`] which sends transactions
2//! to the upcoming leaders.
3
4use {
5 super::leader_updater::LeaderUpdater,
6 crate::{
7 connection_worker::DEFAULT_MAX_CONNECTION_HANDSHAKE_TIMEOUT,
8 logging::debug,
9 quic_networking::{
10 create_client_config, create_client_endpoint, QuicClientCertificate, QuicError,
11 },
12 transaction_batch::TransactionBatch,
13 workers_cache::{shutdown_worker, WorkersCache, WorkersCacheError},
14 SendTransactionStats,
15 },
16 async_trait::async_trait,
17 quinn::{ClientConfig, Endpoint},
18 solana_keypair::Keypair,
19 std::{
20 net::{SocketAddr, UdpSocket},
21 sync::Arc,
22 },
23 thiserror::Error,
24 tokio::sync::{mpsc, watch},
25 tokio_util::sync::CancellationToken,
26};
27pub type TransactionReceiver = mpsc::Receiver<TransactionBatch>;
28
29/// The [`ConnectionWorkersScheduler`] sends transactions from the provided
30/// receiver channel to upcoming leaders. It obtains information about future
31/// leaders from the implementation of the [`LeaderUpdater`] trait.
32///
33/// Internally, it enables the management and coordination of multiple network
34/// connections, schedules and oversees connection workers.
35pub struct ConnectionWorkersScheduler {
36 leader_updater: Box<dyn LeaderUpdater>,
37 transaction_receiver: TransactionReceiver,
38 update_identity_receiver: watch::Receiver<Option<StakeIdentity>>,
39 cancel: CancellationToken,
40 stats: Arc<SendTransactionStats>,
41}
42
43/// Errors that arise from running [`ConnectionWorkersSchedulerError`].
44#[derive(Debug, Error)]
45pub enum ConnectionWorkersSchedulerError {
46 #[error(transparent)]
47 QuicError(#[from] QuicError),
48 #[error(transparent)]
49 WorkersCacheError(#[from] WorkersCacheError),
50 #[error("Leader receiver unexpectedly dropped.")]
51 LeaderReceiverDropped,
52}
53
54/// [`Fanout`] is a configuration struct that specifies how many leaders should
55/// be targeted when sending transactions and connecting.
56///
57/// Note, that the unit is number of leaders per
58/// [`solana_clock::NUM_CONSECUTIVE_LEADER_SLOTS`]. It means that if the leader schedule is
59/// [L1, L1, L1, L1, L1, L1, L1, L1, L2, L2, L2, L2], the leaders per
60/// consecutive leader slots are [L1, L1, L2], so there are 3 of them.
61///
62/// The idea of having a separate `connect` parameter is to create a set of
63/// nodes to connect to in advance in order to hide the latency of opening new
64/// connection. Hence, `connect` must be greater or equal to `send`
65#[derive(Debug, Clone)]
66pub struct Fanout {
67 /// The number of leaders to target for sending transactions.
68 pub send: usize,
69
70 /// The number of leaders to target for establishing connections.
71 pub connect: usize,
72}
73
74/// Configuration for the [`ConnectionWorkersScheduler`].
75///
76/// This struct holds the necessary settings to initialize and manage connection
77/// workers, including network binding, identity, connection limits, and
78/// behavior related to transaction handling.
79pub struct ConnectionWorkersSchedulerConfig {
80 /// The local address to bind the scheduler to.
81 pub bind: BindTarget,
82
83 /// Optional stake identity keypair used in the endpoint certificate for
84 /// identifying the sender.
85 pub stake_identity: Option<StakeIdentity>,
86
87 /// The number of connections to be maintained by the scheduler.
88 pub num_connections: usize,
89
90 /// Whether to skip checking the transaction blockhash expiration.
91 pub skip_check_transaction_age: bool,
92
93 /// The size of the channel used to transmit transaction batches to the
94 /// worker tasks.
95 pub worker_channel_size: usize,
96
97 /// The maximum number of reconnection attempts allowed in case of
98 /// connection failure.
99 pub max_reconnect_attempts: usize,
100
101 /// Configures the number of leaders to connect to and send transactions to.
102 pub leaders_fanout: Fanout,
103}
104
105/// The [`BindTarget`] enum defines how the UDP socket should be bound:
106/// either by providing a [`SocketAddr`] or an existing [`UdpSocket`].
107pub enum BindTarget {
108 Address(SocketAddr),
109 Socket(UdpSocket),
110}
111
112/// The [`StakeIdentity`] structure provides a convenient abstraction for handling
113/// [`Keypair`] when creating a QUIC certificate. Since `Keypair` does not implement
114/// [`Clone`], it cannot be moved in situations where [`ConnectionWorkersSchedulerConfig`]
115/// needs to be transferred. This wrapper structure allows the use of either a `Keypair`
116/// or a `&Keypair` to create a certificate, which is stored internally and later
117/// consumed by [`ConnectionWorkersScheduler`] to create an endpoint.
118pub struct StakeIdentity(QuicClientCertificate);
119
120impl StakeIdentity {
121 pub fn new(keypair: &Keypair) -> Self {
122 Self(QuicClientCertificate::new(Some(keypair)))
123 }
124
125 pub fn as_certificate(&self) -> &QuicClientCertificate {
126 &self.0
127 }
128}
129
130impl From<StakeIdentity> for QuicClientCertificate {
131 fn from(identity: StakeIdentity) -> Self {
132 identity.0
133 }
134}
135
136/// The [`WorkersBroadcaster`] trait defines a customizable mechanism for
137/// sending transaction batches to workers corresponding to the provided list of
138/// addresses. Implementations of this trait are used by the
139/// [`ConnectionWorkersScheduler`] to distribute transactions to workers
140/// accordingly.
141#[async_trait]
142pub trait WorkersBroadcaster {
143 /// Sends a `transaction_batch` to workers associated with the given
144 /// `leaders` addresses.
145 ///
146 /// Returns error if a critical issue occurs, e.g. the implementation
147 /// encounters an unrecoverable error. In this case, it will trigger
148 /// stopping the scheduler and cleaning all the data.
149 async fn send_to_workers(
150 workers: &mut WorkersCache,
151 leaders: &[SocketAddr],
152 transaction_batch: TransactionBatch,
153 ) -> Result<(), ConnectionWorkersSchedulerError>;
154}
155
156impl ConnectionWorkersScheduler {
157 /// Creates the scheduler, which manages the distribution of transactions to
158 /// the network's upcoming leaders.
159 pub fn new(
160 leader_updater: Box<dyn LeaderUpdater>,
161 transaction_receiver: mpsc::Receiver<TransactionBatch>,
162 update_identity_receiver: watch::Receiver<Option<StakeIdentity>>,
163 cancel: CancellationToken,
164 ) -> Self {
165 let stats = Arc::new(SendTransactionStats::default());
166 Self {
167 leader_updater,
168 transaction_receiver,
169 update_identity_receiver,
170 cancel,
171 stats,
172 }
173 }
174
175 /// Retrieves a reference to the statistics of the scheduler
176 pub fn get_stats(&self) -> Arc<SendTransactionStats> {
177 self.stats.clone()
178 }
179
180 /// Starts the scheduler.
181 ///
182 /// This method is a shorthand for
183 /// [`ConnectionWorkersScheduler::run_with_broadcaster`] using
184 /// `NonblockingBroadcaster` strategy.
185 ///
186 /// Transactions that fail to be delivered to workers due to full channels
187 /// will be dropped. The same for transactions that failed to be delivered
188 /// over the network.
189 pub async fn run(
190 self,
191 config: ConnectionWorkersSchedulerConfig,
192 ) -> Result<Arc<SendTransactionStats>, ConnectionWorkersSchedulerError> {
193 self.run_with_broadcaster::<NonblockingBroadcaster>(config)
194 .await
195 }
196
197 /// Starts the scheduler, which manages the distribution of transactions to
198 /// the network's upcoming leaders. `Broadcaster` allows to customize the
199 /// way transactions are send to the leaders, see [`WorkersBroadcaster`].
200 ///
201 /// Runs the main loop that handles worker scheduling and management for
202 /// connections. Returns [`SendTransactionStats`] or an error.
203 ///
204 /// Importantly, if some transactions were not delivered due to network
205 /// problems, they will not be retried when the problem is resolved.
206 pub async fn run_with_broadcaster<Broadcaster: WorkersBroadcaster>(
207 self,
208 ConnectionWorkersSchedulerConfig {
209 bind,
210 stake_identity,
211 num_connections,
212 skip_check_transaction_age,
213 worker_channel_size,
214 max_reconnect_attempts,
215 leaders_fanout,
216 }: ConnectionWorkersSchedulerConfig,
217 ) -> Result<Arc<SendTransactionStats>, ConnectionWorkersSchedulerError> {
218 let ConnectionWorkersScheduler {
219 mut leader_updater,
220 mut transaction_receiver,
221 mut update_identity_receiver,
222 cancel,
223 stats,
224 } = self;
225 let mut endpoint = setup_endpoint(bind, stake_identity)?;
226
227 debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
228 let mut workers = WorkersCache::new(num_connections, cancel.clone());
229
230 let mut last_error = None;
231 // flag to ensure that the section handling
232 // `update_identity_receiver.changed()` is entered only once when the
233 // channel is dropped.
234 let mut identity_updater_is_active = true;
235
236 loop {
237 let transaction_batch: TransactionBatch = tokio::select! {
238 recv_res = transaction_receiver.recv() => match recv_res {
239 Some(txs) => txs,
240 None => {
241 debug!("End of `transaction_receiver`: shutting down.");
242 break;
243 }
244 },
245 res = update_identity_receiver.changed(), if identity_updater_is_active => {
246 let Ok(()) = res else {
247 // Sender has been dropped; log and continue
248 debug!("Certificate update channel closed; continuing without further updates.");
249 identity_updater_is_active = false;
250 continue;
251 };
252
253 let client_config = build_client_config(update_identity_receiver.borrow_and_update().as_ref());
254 endpoint.set_default_client_config(client_config);
255 // Flush workers since they are handling connections created
256 // with outdated certificate.
257 workers.flush();
258 debug!("Updated certificate.");
259 continue;
260 },
261 () = cancel.cancelled() => {
262 debug!("Cancelled: Shutting down");
263 break;
264 }
265 };
266
267 let connect_leaders = leader_updater.next_leaders(leaders_fanout.connect);
268 let send_leaders = extract_send_leaders(&connect_leaders, leaders_fanout.send);
269
270 // add future leaders to the cache to hide the latency of opening
271 // the connection.
272 for peer in connect_leaders {
273 if let Some(evicted_worker) = workers.ensure_worker(
274 peer,
275 &endpoint,
276 worker_channel_size,
277 skip_check_transaction_age,
278 max_reconnect_attempts,
279 DEFAULT_MAX_CONNECTION_HANDSHAKE_TIMEOUT,
280 stats.clone(),
281 ) {
282 shutdown_worker(evicted_worker);
283 }
284 }
285
286 if let Err(error) =
287 Broadcaster::send_to_workers(&mut workers, &send_leaders, transaction_batch).await
288 {
289 last_error = Some(error);
290 break;
291 }
292 }
293
294 workers.shutdown().await;
295
296 endpoint.close(0u32.into(), b"Closing connection");
297 leader_updater.stop().await;
298 if let Some(error) = last_error {
299 return Err(error);
300 }
301 Ok(stats)
302 }
303}
304
305/// Sets up the QUIC endpoint for the scheduler to handle connections.
306pub fn setup_endpoint(
307 bind: BindTarget,
308 stake_identity: Option<StakeIdentity>,
309) -> Result<Endpoint, ConnectionWorkersSchedulerError> {
310 let client_config = build_client_config(stake_identity.as_ref());
311 let endpoint = create_client_endpoint(bind, client_config)?;
312 Ok(endpoint)
313}
314
315fn build_client_config(stake_identity: Option<&StakeIdentity>) -> ClientConfig {
316 let client_certificate = match stake_identity {
317 Some(identity) => identity.as_certificate(),
318 None => &QuicClientCertificate::new(None),
319 };
320 create_client_config(client_certificate)
321}
322
323/// [`NonblockingBroadcaster`] attempts to immediately send transactions to all
324/// the workers. If worker cannot accept transactions because it's channel is
325/// full, the transactions will not be sent to this worker.
326struct NonblockingBroadcaster;
327
328#[async_trait]
329impl WorkersBroadcaster for NonblockingBroadcaster {
330 async fn send_to_workers(
331 workers: &mut WorkersCache,
332 leaders: &[SocketAddr],
333 transaction_batch: TransactionBatch,
334 ) -> Result<(), ConnectionWorkersSchedulerError> {
335 for new_leader in leaders {
336 let send_res =
337 workers.try_send_transactions_to_address(new_leader, transaction_batch.clone());
338 if let Err(err) = send_res {
339 debug!("Failed to send transactions to {new_leader:?}, worker send error: {err}.");
340 if err == WorkersCacheError::ReceiverDropped {
341 // Remove the worker from the cache if the peer has disconnected.
342 if let Some(pop_worker) = workers.pop(*new_leader) {
343 shutdown_worker(pop_worker)
344 }
345 }
346 }
347 }
348 Ok(())
349 }
350}
351
352/// Extracts a list of unique leader addresses to which transactions will be sent.
353///
354/// This function selects up to `send_fanout` addresses from the `leaders` list, ensuring that
355/// only unique addresses are included while maintaining their original order.
356pub fn extract_send_leaders(leaders: &[SocketAddr], send_fanout: usize) -> Vec<SocketAddr> {
357 let send_count = send_fanout.min(leaders.len());
358 remove_duplicates(&leaders[..send_count])
359}
360
361/// Removes duplicate `SocketAddr` elements from the given slice while
362/// preserving their original order.
363fn remove_duplicates(input: &[SocketAddr]) -> Vec<SocketAddr> {
364 let mut res = Vec::with_capacity(input.len());
365 for address in input {
366 if !res.contains(address) {
367 res.push(*address);
368 }
369 }
370 res
371}