Skip to main content

solana_tpu_client_next/
connection_workers_scheduler.rs

1//! This module defines [`ConnectionWorkersScheduler`] which sends transactions
2//! to the upcoming leaders.
3
4use {
5    super::leader_updater::LeaderUpdater,
6    crate::{
7        connection_worker::DEFAULT_MAX_CONNECTION_HANDSHAKE_TIMEOUT,
8        logging::debug,
9        quic_networking::{
10            create_client_config, create_client_endpoint, QuicClientCertificate, QuicError,
11        },
12        transaction_batch::TransactionBatch,
13        workers_cache::{shutdown_worker, WorkersCache, WorkersCacheError},
14        SendTransactionStats,
15    },
16    async_trait::async_trait,
17    quinn::{ClientConfig, Endpoint},
18    solana_keypair::Keypair,
19    std::{
20        net::{SocketAddr, UdpSocket},
21        sync::Arc,
22    },
23    thiserror::Error,
24    tokio::sync::{mpsc, watch},
25    tokio_util::sync::CancellationToken,
26};
27pub type TransactionReceiver = mpsc::Receiver<TransactionBatch>;
28
29/// The [`ConnectionWorkersScheduler`] sends transactions from the provided
30/// receiver channel to upcoming leaders. It obtains information about future
31/// leaders from the implementation of the [`LeaderUpdater`] trait.
32///
33/// Internally, it enables the management and coordination of multiple network
34/// connections, schedules and oversees connection workers.
35pub struct ConnectionWorkersScheduler {
36    leader_updater: Box<dyn LeaderUpdater>,
37    transaction_receiver: TransactionReceiver,
38    update_identity_receiver: watch::Receiver<Option<StakeIdentity>>,
39    cancel: CancellationToken,
40    stats: Arc<SendTransactionStats>,
41}
42
43/// Errors that arise from running [`ConnectionWorkersSchedulerError`].
44#[derive(Debug, Error)]
45pub enum ConnectionWorkersSchedulerError {
46    #[error(transparent)]
47    QuicError(#[from] QuicError),
48    #[error(transparent)]
49    WorkersCacheError(#[from] WorkersCacheError),
50    #[error("Leader receiver unexpectedly dropped.")]
51    LeaderReceiverDropped,
52}
53
54/// [`Fanout`] is a configuration struct that specifies how many leaders should
55/// be targeted when sending transactions and connecting.
56///
57/// Note, that the unit is number of leaders per
58/// [`solana_clock::NUM_CONSECUTIVE_LEADER_SLOTS`]. It means that if the leader schedule is
59/// [L1, L1, L1, L1, L1, L1, L1, L1, L2, L2, L2, L2], the leaders per
60/// consecutive leader slots are [L1, L1, L2], so there are 3 of them.
61///
62/// The idea of having a separate `connect` parameter is to create a set of
63/// nodes to connect to in advance in order to hide the latency of opening new
64/// connection. Hence, `connect` must be greater or equal to `send`
65#[derive(Debug, Clone)]
66pub struct Fanout {
67    /// The number of leaders to target for sending transactions.
68    pub send: usize,
69
70    /// The number of leaders to target for establishing connections.
71    pub connect: usize,
72}
73
74/// Configuration for the [`ConnectionWorkersScheduler`].
75///
76/// This struct holds the necessary settings to initialize and manage connection
77/// workers, including network binding, identity, connection limits, and
78/// behavior related to transaction handling.
79pub struct ConnectionWorkersSchedulerConfig {
80    /// The local address to bind the scheduler to.
81    pub bind: BindTarget,
82
83    /// Optional stake identity keypair used in the endpoint certificate for
84    /// identifying the sender.
85    pub stake_identity: Option<StakeIdentity>,
86
87    /// The number of connections to be maintained by the scheduler.
88    pub num_connections: usize,
89
90    /// Whether to skip checking the transaction blockhash expiration.
91    pub skip_check_transaction_age: bool,
92
93    /// The size of the channel used to transmit transaction batches to the
94    /// worker tasks.
95    pub worker_channel_size: usize,
96
97    /// The maximum number of reconnection attempts allowed in case of
98    /// connection failure.
99    pub max_reconnect_attempts: usize,
100
101    /// Configures the number of leaders to connect to and send transactions to.
102    pub leaders_fanout: Fanout,
103}
104
105/// The [`BindTarget`] enum defines how the UDP socket should be bound:
106/// either by providing a [`SocketAddr`] or an existing [`UdpSocket`].
107pub enum BindTarget {
108    Address(SocketAddr),
109    Socket(UdpSocket),
110}
111
112/// The [`StakeIdentity`] structure provides a convenient abstraction for handling
113/// [`Keypair`] when creating a QUIC certificate. Since `Keypair` does not implement
114/// [`Clone`], it cannot be moved in situations where [`ConnectionWorkersSchedulerConfig`]
115/// needs to be transferred. This wrapper structure allows the use of either a `Keypair`
116/// or a `&Keypair` to create a certificate, which is stored internally and later
117/// consumed by [`ConnectionWorkersScheduler`] to create an endpoint.
118pub struct StakeIdentity(QuicClientCertificate);
119
120impl StakeIdentity {
121    pub fn new(keypair: &Keypair) -> Self {
122        Self(QuicClientCertificate::new(Some(keypair)))
123    }
124
125    pub fn as_certificate(&self) -> &QuicClientCertificate {
126        &self.0
127    }
128}
129
130impl From<StakeIdentity> for QuicClientCertificate {
131    fn from(identity: StakeIdentity) -> Self {
132        identity.0
133    }
134}
135
136/// The [`WorkersBroadcaster`] trait defines a customizable mechanism for
137/// sending transaction batches to workers corresponding to the provided list of
138/// addresses. Implementations of this trait are used by the
139/// [`ConnectionWorkersScheduler`] to distribute transactions to workers
140/// accordingly.
141#[async_trait]
142pub trait WorkersBroadcaster {
143    /// Sends a `transaction_batch` to workers associated with the given
144    /// `leaders` addresses.
145    ///
146    /// Returns error if a critical issue occurs, e.g. the implementation
147    /// encounters an unrecoverable error. In this case, it will trigger
148    /// stopping the scheduler and cleaning all the data.
149    async fn send_to_workers(
150        workers: &mut WorkersCache,
151        leaders: &[SocketAddr],
152        transaction_batch: TransactionBatch,
153    ) -> Result<(), ConnectionWorkersSchedulerError>;
154}
155
156impl ConnectionWorkersScheduler {
157    /// Creates the scheduler, which manages the distribution of transactions to
158    /// the network's upcoming leaders.
159    pub fn new(
160        leader_updater: Box<dyn LeaderUpdater>,
161        transaction_receiver: mpsc::Receiver<TransactionBatch>,
162        update_identity_receiver: watch::Receiver<Option<StakeIdentity>>,
163        cancel: CancellationToken,
164    ) -> Self {
165        let stats = Arc::new(SendTransactionStats::default());
166        Self {
167            leader_updater,
168            transaction_receiver,
169            update_identity_receiver,
170            cancel,
171            stats,
172        }
173    }
174
175    /// Retrieves a reference to the statistics of the scheduler
176    pub fn get_stats(&self) -> Arc<SendTransactionStats> {
177        self.stats.clone()
178    }
179
180    /// Starts the scheduler.
181    ///
182    /// This method is a shorthand for
183    /// [`ConnectionWorkersScheduler::run_with_broadcaster`] using
184    /// `NonblockingBroadcaster` strategy.
185    ///
186    /// Transactions that fail to be delivered to workers due to full channels
187    /// will be dropped. The same for transactions that failed to be delivered
188    /// over the network.
189    pub async fn run(
190        self,
191        config: ConnectionWorkersSchedulerConfig,
192    ) -> Result<Arc<SendTransactionStats>, ConnectionWorkersSchedulerError> {
193        self.run_with_broadcaster::<NonblockingBroadcaster>(config)
194            .await
195    }
196
197    /// Starts the scheduler, which manages the distribution of transactions to
198    /// the network's upcoming leaders. `Broadcaster` allows to customize the
199    /// way transactions are send to the leaders, see [`WorkersBroadcaster`].
200    ///
201    /// Runs the main loop that handles worker scheduling and management for
202    /// connections. Returns [`SendTransactionStats`] or an error.
203    ///
204    /// Importantly, if some transactions were not delivered due to network
205    /// problems, they will not be retried when the problem is resolved.
206    pub async fn run_with_broadcaster<Broadcaster: WorkersBroadcaster>(
207        self,
208        ConnectionWorkersSchedulerConfig {
209            bind,
210            stake_identity,
211            num_connections,
212            skip_check_transaction_age,
213            worker_channel_size,
214            max_reconnect_attempts,
215            leaders_fanout,
216        }: ConnectionWorkersSchedulerConfig,
217    ) -> Result<Arc<SendTransactionStats>, ConnectionWorkersSchedulerError> {
218        let ConnectionWorkersScheduler {
219            mut leader_updater,
220            mut transaction_receiver,
221            mut update_identity_receiver,
222            cancel,
223            stats,
224        } = self;
225        let mut endpoint = setup_endpoint(bind, stake_identity)?;
226
227        debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
228        let mut workers = WorkersCache::new(num_connections, cancel.clone());
229
230        let mut last_error = None;
231        // flag to ensure that the section handling
232        // `update_identity_receiver.changed()` is entered only once when the
233        // channel is dropped.
234        let mut identity_updater_is_active = true;
235
236        loop {
237            let transaction_batch: TransactionBatch = tokio::select! {
238                recv_res = transaction_receiver.recv() => match recv_res {
239                    Some(txs) => txs,
240                    None => {
241                        debug!("End of `transaction_receiver`: shutting down.");
242                        break;
243                    }
244                },
245                res = update_identity_receiver.changed(), if identity_updater_is_active => {
246                    let Ok(()) = res else {
247                        // Sender has been dropped; log and continue
248                        debug!("Certificate update channel closed; continuing without further updates.");
249                        identity_updater_is_active = false;
250                        continue;
251                    };
252
253                    let client_config = build_client_config(update_identity_receiver.borrow_and_update().as_ref());
254                    endpoint.set_default_client_config(client_config);
255                    // Flush workers since they are handling connections created
256                    // with outdated certificate.
257                    workers.flush();
258                    debug!("Updated certificate.");
259                    continue;
260                },
261                () = cancel.cancelled() => {
262                    debug!("Cancelled: Shutting down");
263                    break;
264                }
265            };
266
267            let connect_leaders = leader_updater.next_leaders(leaders_fanout.connect);
268            let send_leaders = extract_send_leaders(&connect_leaders, leaders_fanout.send);
269
270            // add future leaders to the cache to hide the latency of opening
271            // the connection.
272            for peer in connect_leaders {
273                if let Some(evicted_worker) = workers.ensure_worker(
274                    peer,
275                    &endpoint,
276                    worker_channel_size,
277                    skip_check_transaction_age,
278                    max_reconnect_attempts,
279                    DEFAULT_MAX_CONNECTION_HANDSHAKE_TIMEOUT,
280                    stats.clone(),
281                ) {
282                    shutdown_worker(evicted_worker);
283                }
284            }
285
286            if let Err(error) =
287                Broadcaster::send_to_workers(&mut workers, &send_leaders, transaction_batch).await
288            {
289                last_error = Some(error);
290                break;
291            }
292        }
293
294        workers.shutdown().await;
295
296        endpoint.close(0u32.into(), b"Closing connection");
297        leader_updater.stop().await;
298        if let Some(error) = last_error {
299            return Err(error);
300        }
301        Ok(stats)
302    }
303}
304
305/// Sets up the QUIC endpoint for the scheduler to handle connections.
306pub fn setup_endpoint(
307    bind: BindTarget,
308    stake_identity: Option<StakeIdentity>,
309) -> Result<Endpoint, ConnectionWorkersSchedulerError> {
310    let client_config = build_client_config(stake_identity.as_ref());
311    let endpoint = create_client_endpoint(bind, client_config)?;
312    Ok(endpoint)
313}
314
315fn build_client_config(stake_identity: Option<&StakeIdentity>) -> ClientConfig {
316    let client_certificate = match stake_identity {
317        Some(identity) => identity.as_certificate(),
318        None => &QuicClientCertificate::new(None),
319    };
320    create_client_config(client_certificate)
321}
322
323/// [`NonblockingBroadcaster`] attempts to immediately send transactions to all
324/// the workers. If worker cannot accept transactions because it's channel is
325/// full, the transactions will not be sent to this worker.
326struct NonblockingBroadcaster;
327
328#[async_trait]
329impl WorkersBroadcaster for NonblockingBroadcaster {
330    async fn send_to_workers(
331        workers: &mut WorkersCache,
332        leaders: &[SocketAddr],
333        transaction_batch: TransactionBatch,
334    ) -> Result<(), ConnectionWorkersSchedulerError> {
335        for new_leader in leaders {
336            let send_res =
337                workers.try_send_transactions_to_address(new_leader, transaction_batch.clone());
338            if let Err(err) = send_res {
339                debug!("Failed to send transactions to {new_leader:?}, worker send error: {err}.");
340                if err == WorkersCacheError::ReceiverDropped {
341                    // Remove the worker from the cache if the peer has disconnected.
342                    if let Some(pop_worker) = workers.pop(*new_leader) {
343                        shutdown_worker(pop_worker)
344                    }
345                }
346            }
347        }
348        Ok(())
349    }
350}
351
352/// Extracts a list of unique leader addresses to which transactions will be sent.
353///
354/// This function selects up to `send_fanout` addresses from the `leaders` list, ensuring that
355/// only unique addresses are included while maintaining their original order.
356pub fn extract_send_leaders(leaders: &[SocketAddr], send_fanout: usize) -> Vec<SocketAddr> {
357    let send_count = send_fanout.min(leaders.len());
358    remove_duplicates(&leaders[..send_count])
359}
360
361/// Removes duplicate `SocketAddr` elements from the given slice while
362/// preserving their original order.
363fn remove_duplicates(input: &[SocketAddr]) -> Vec<SocketAddr> {
364    let mut res = Vec::with_capacity(input.len());
365    for address in input {
366        if !res.contains(address) {
367            res.push(*address);
368        }
369    }
370    res
371}