Skip to main content

tycho_core/overlay_client/
validators.rs

1use std::borrow::Borrow;
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5use futures_util::StreamExt;
6use futures_util::stream::FuturesUnordered;
7use tokio::sync::mpsc;
8use tokio::task::AbortHandle;
9use tycho_network::{KnownPeerHandle, Network, PeerId, PeerResolver, PublicOverlay, Request};
10use tycho_types::models::ValidatorSet;
11use tycho_util::FastHashSet;
12use tycho_util::futures::JoinTask;
13use tycho_util::metrics::HistogramGuard;
14
15use crate::block_strider::{BlockSubscriber, BlockSubscriberContext};
16use crate::overlay_client::config::ValidatorsConfig;
17use crate::proto::overlay;
18
19pub trait ValidatorSetPeers {
20    fn get_peers(&self) -> FastHashSet<PeerId>;
21}
22
23impl ValidatorSetPeers for ValidatorSet {
24    fn get_peers(&self) -> FastHashSet<PeerId> {
25        self.list.iter().map(|x| PeerId(x.public_key.0)).collect()
26    }
27}
28
29impl<T: Borrow<PeerId>> ValidatorSetPeers for [T] {
30    fn get_peers(&self) -> FastHashSet<PeerId> {
31        self.iter().map(|x| *x.borrow()).collect()
32    }
33}
34
35impl<T: Borrow<PeerId>> ValidatorSetPeers for Vec<T> {
36    fn get_peers(&self) -> FastHashSet<PeerId> {
37        self.iter().map(|x| *x.borrow()).collect()
38    }
39}
40
41impl ValidatorSetPeers for FastHashSet<PeerId> {
42    fn get_peers(&self) -> FastHashSet<PeerId> {
43        self.clone()
44    }
45}
46
47#[derive(Clone)]
48pub struct ValidatorsResolver {
49    inner: Arc<Inner>,
50}
51
52impl ValidatorsResolver {
53    pub fn new(network: Network, overlay: PublicOverlay, config: ValidatorsConfig) -> Self {
54        let (peers_tx, peers_rx) = mpsc::unbounded_channel();
55
56        let peer_resolver = overlay.peer_resolver().clone();
57
58        let validators = Arc::new(Validators {
59            config,
60            resolved: Default::default(),
61            targets: Default::default(),
62            network,
63            overlay,
64            current_epoch: Default::default(),
65            target_validators_gauge: metrics::gauge!("tycho_core_overlay_client_target_validators"),
66            resolved_validators_gauge: metrics::gauge!(
67                "tycho_core_overlay_client_resolved_validators"
68            ),
69        });
70
71        let resolver_worker_handle = tokio::spawn({
72            let validators = validators.clone();
73            async move {
74                if let Some(peer_resolver) = peer_resolver {
75                    validators.listen(peers_rx, peer_resolver).await;
76                }
77            }
78        });
79
80        Self {
81            inner: Arc::new(Inner {
82                validators,
83                peers_tx,
84
85                resolver_worker_handle: resolver_worker_handle.abort_handle(),
86            }),
87        }
88    }
89
90    pub fn update_validator_set<T: ValidatorSetPeers>(&self, vset: &T) {
91        let new_peers = vset.get_peers();
92        self.inner.peers_tx.send(new_peers).ok();
93    }
94
95    pub fn get_broadcast_targets(&self) -> Arc<Vec<Validator>> {
96        self.inner.validators.targets.load_full()
97    }
98}
99
100impl BlockSubscriber for ValidatorsResolver {
101    type Prepared = ();
102    type PrepareBlockFut<'a> = futures_util::future::Ready<anyhow::Result<()>>;
103    type HandleBlockFut<'a> = futures_util::future::Ready<anyhow::Result<()>>;
104
105    fn prepare_block<'a>(&'a self, _: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
106        futures_util::future::ready(Ok(()))
107    }
108
109    fn handle_block<'a>(
110        &'a self,
111        cx: &'a BlockSubscriberContext,
112        _: Self::Prepared,
113    ) -> Self::HandleBlockFut<'a> {
114        if !cx.is_key_block {
115            return futures_util::future::ready(Ok(()));
116        }
117        tracing::info!("updating validators");
118        let config = match cx.block.load_custom() {
119            Ok(extra) => &extra.config,
120            Err(e) => {
121                return futures_util::future::ready(Err(anyhow::anyhow!(
122                    "failed to load mc block extra: {e:?}"
123                )));
124            }
125        };
126
127        if let Some(config) = config {
128            match config.get_current_validator_set() {
129                Ok(vset) => self.update_validator_set(&vset),
130                Err(e) => {
131                    tracing::error!("failed to get validator set from blockchain config: {e:?}");
132                }
133            }
134        }
135
136        futures_util::future::ready(Ok(()))
137    }
138}
139
140struct Inner {
141    validators: Arc<Validators>,
142    peers_tx: PeersTx,
143
144    resolver_worker_handle: AbortHandle,
145}
146
147impl Drop for Inner {
148    fn drop(&mut self) {
149        tracing::info!("stopping validators resolver");
150        self.resolver_worker_handle.abort();
151    }
152}
153
154#[derive(Clone)]
155pub struct Validator {
156    inner: Arc<ValidatorInner>,
157}
158
159struct ValidatorInner {
160    handle: KnownPeerHandle,
161}
162
163impl Validator {
164    pub fn peer_id(&self) -> PeerId {
165        self.inner.handle.peer_info().id
166    }
167
168    pub fn is_expired(&self, now: u32) -> bool {
169        const NEW_THRESHOLD: u32 = 1800; // 30 minutes
170
171        let peer_info = self.inner.handle.peer_info();
172        let is_quite_old = peer_info.created_at + NEW_THRESHOLD < now;
173        is_quite_old || peer_info.expires_at < now
174    }
175}
176
177impl std::fmt::Debug for Validator {
178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179        f.debug_struct("Validator")
180            .field("peer_id", &self.peer_id())
181            .finish()
182    }
183}
184
185struct Validators {
186    config: ValidatorsConfig,
187
188    // All resolved validators from the current set
189    resolved: ArcSwap<Vec<Validator>>,
190
191    // A small random subset of possibly alive validators
192    targets: ArcSwap<Vec<Validator>>,
193
194    network: Network,
195    overlay: PublicOverlay,
196
197    // NOTE: Mutex is used instead of atomic since we need a larger scope of locking
198    current_epoch: parking_lot::Mutex<usize>,
199
200    target_validators_gauge: metrics::Gauge,
201    resolved_validators_gauge: metrics::Gauge,
202}
203
204impl Validators {
205    #[tracing::instrument(name = "resolve_validators", skip_all)]
206    async fn listen(self: &Arc<Self>, mut peers_rx: PeersRx, peer_resolver: PeerResolver) {
207        tracing::info!("started");
208        scopeguard::defer! { tracing::info!("finished"); };
209
210        let mut current_peers = None;
211
212        let local_id = peer_resolver.dht_service().local_id();
213
214        loop {
215            tokio::select! {
216                maybe_peers = peers_rx.recv() => {
217                    match maybe_peers {
218                        Some(peers) => {
219                            current_peers = Some(peers);
220                        }
221                        None => break,
222                    }
223                }
224                _ = async {
225                    let Some(mut peers) = current_peers.take() else {
226                        futures_util::future::pending().await
227                    };
228
229                    let epoch = self.prepare_peers(&mut peers, local_id);
230
231                    // Start tracking the resolved validators in the background
232                    let this = self.clone();
233                    let tracker_handle = JoinTask::new(async move {
234                        this.track_resolved(epoch).await;
235                    });
236
237                    // Resolve the remaining validators
238                    self.resolve(peers, &peer_resolver).await;
239
240                    // Wait indefinitely until this future is cancelled
241                    tracker_handle.await;
242                } => {}
243            }
244        }
245    }
246
247    fn prepare_peers(&self, peers: &mut FastHashSet<PeerId>, local_id: &PeerId) -> usize {
248        // Remove us from the list of validators
249        peers.remove(local_id);
250
251        metrics::gauge!("tycho_core_overlay_client_validators_to_resolve").set(peers.len() as f64);
252
253        // Increment the epoch to ensure that the background task will not overwrite the newest list
254        let epoch = {
255            let mut current_epoch = self.current_epoch.lock();
256            *current_epoch += 1;
257            *current_epoch
258        };
259
260        tracing::debug!(epoch, ?peers, "preparing validators");
261
262        // Filter targets
263        {
264            let targets = self.targets.load_full();
265            // changed will be set to true if ANY validator is not in the list of peers
266            let mut changed = false;
267
268            // list of validators which are in the list of peers(are alive)
269            let targets = targets
270                .iter()
271                .filter(|validator| {
272                    // NOTE: Don't remove from `peers` here since we need it for the `resolved` list
273                    let retain = peers.contains(&validator.inner.handle.peer_info().id);
274                    tracing::debug!(id = %validator.peer_id(), ?retain, "filtering validator");
275
276                    changed |= !retain;
277                    retain
278                })
279                .cloned()
280                .collect::<Vec<_>>();
281
282            let count = targets.len();
283            if changed {
284                self.targets.store(Arc::new(targets));
285            }
286
287            self.target_validators_gauge.set(count as f64);
288        }
289
290        tracing::debug!(epoch, "prepared validators");
291
292        // Remove old resolved validators and skip existing ones
293
294        {
295            let resolved = self.resolved.load_full();
296
297            tracing::debug!(epoch, ?resolved, ?peers, "resolving validators");
298
299            let mut changed = false;
300            let resolved = resolved
301                .iter()
302                .filter(|validator| {
303                    let retain = peers.remove(&validator.inner.handle.peer_info().id);
304                    changed |= !retain;
305                    retain
306                })
307                .cloned()
308                .collect::<Vec<_>>();
309
310            let count = resolved.len();
311
312            tracing::debug!(epoch, ?resolved, count, "resolved validators");
313            if changed {
314                self.resolved.store(Arc::new(resolved));
315            }
316
317            self.resolved_validators_gauge.set(count as f64);
318        }
319
320        // Return the new epoch
321        epoch
322    }
323
324    async fn resolve(&self, peers: FastHashSet<PeerId>, peer_resolver: &PeerResolver) {
325        tracing::debug!(?peers, "started resolving validators");
326
327        // Resolve all remaining new peers
328        let mut resolved = FuturesUnordered::new();
329        for peer_id in peers {
330            let peer = peer_resolver.insert(&peer_id, false);
331            resolved.push(async move { peer.wait_resolved().await });
332        }
333
334        while let Some(handle) = resolved.next().await {
335            let peer_id = handle.peer_info().id;
336            tracing::debug!(%peer_id, "resolved validator");
337
338            let mut resolved = self.resolved.load_full();
339            Arc::make_mut(&mut resolved).push(Validator {
340                inner: Arc::new(ValidatorInner { handle }),
341            });
342            let count = resolved.len();
343            self.resolved.store(resolved);
344            self.resolved_validators_gauge.set(count as f64);
345        }
346
347        tracing::debug!("resolved all validators");
348    }
349
350    #[tracing::instrument(skip(self))]
351    async fn track_resolved(&self, epoch: usize) {
352        use futures_util::StreamExt;
353        use rand::seq::SliceRandom;
354
355        tracing::debug!(epoch, "started resolving peers");
356        scopeguard::defer! {
357            tracing::debug!(epoch,"finished monitoring resolved validators");
358        }
359
360        let request = Request::from_tl(overlay::Ping);
361
362        let max_validators = self.config.keep;
363
364        let mut interval = tokio::time::interval(self.config.ping_interval);
365        loop {
366            interval.tick().await;
367
368            // Load a snapshot of the resolved validators list
369            let mut resolved = Arc::unwrap_or_clone(self.resolved.load_full());
370
371            // Remove definitely dead validators
372            let now = tycho_util::time::now_sec();
373            resolved.retain(|validator| !validator.is_expired(now));
374
375            // Shuffle the list of possibly alive validators
376            resolved.shuffle(&mut rand::rng());
377
378            let spawn_ping = |validator: Validator| {
379                let network = self.network.clone();
380                let overlay = self.overlay.clone();
381                let request = request.clone();
382                let ping_timeout = self.config.ping_timeout;
383
384                JoinTask::new(async move {
385                    let _histogram =
386                        HistogramGuard::begin("tycho_core_overlay_client_validator_ping_time");
387
388                    let peer_id = validator.peer_id();
389                    let res = tokio::time::timeout(
390                        ping_timeout,
391                        overlay.query(&network, &peer_id, request),
392                    )
393                    .await;
394
395                    match res {
396                        Ok(Ok(res)) => match res.parse_tl::<overlay::Pong>() {
397                            Ok(_) => Some(validator),
398                            Err(e) => {
399                                tracing::debug!(%peer_id, "received an invalid ping response: {e}");
400                                None
401                            }
402                        },
403                        Ok(Err(e)) => {
404                            tracing::debug!(%peer_id, "failed to ping validator: {e}");
405                            None
406                        }
407                        Err(_) => {
408                            tracing::debug!(%peer_id, "failed to ping validator: timeout");
409                            None
410                        }
411                    }
412                })
413            };
414
415            let mut targets = Vec::with_capacity(max_validators);
416
417            let mut resolved = resolved.into_iter();
418
419            // Spawn initial `max_validators` ping tasks
420            let mut futures = resolved
421                .by_ref()
422                .map(spawn_ping)
423                .take(max_validators)
424                .collect::<FuturesUnordered<_>>();
425
426            // Collect successful ping results and spawn new if needed
427            while let Some(res) = futures.next().await {
428                match res {
429                    // Use validator if the ping was successful
430                    Some(validator) => {
431                        targets.push(validator);
432                        if targets.len() >= max_validators {
433                            break;
434                        }
435                    }
436                    None => match resolved.next() {
437                        // If the ping failed, try the next validator
438                        Some(validator) => futures.push(spawn_ping(validator)),
439                        // Leave it as is if there are no more validators
440                        None => break,
441                    },
442                }
443            }
444
445            let count = targets.len();
446
447            // Only update the targets list if the epoch hasn't changed
448            {
449                let current_epoch = self.current_epoch.lock();
450                if *current_epoch == epoch {
451                    // NOTE: The list is updated in the guards's scope to ensure that the new list will wait
452                    self.targets.store(Arc::new(targets));
453                } else {
454                    return;
455                }
456            }
457
458            self.target_validators_gauge.set(count as f64);
459
460            // Done
461            tracing::info!(epoch, "updated current validators list");
462        }
463    }
464}
465
466type PeersTx = mpsc::UnboundedSender<FastHashSet<PeerId>>;
467type PeersRx = mpsc::UnboundedReceiver<FastHashSet<PeerId>>;