tycho_core/overlay_client/
validators.rs1use std::borrow::Borrow;
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5use futures_util::StreamExt;
6use futures_util::stream::FuturesUnordered;
7use tokio::sync::mpsc;
8use tokio::task::AbortHandle;
9use tycho_network::{KnownPeerHandle, Network, PeerId, PeerResolver, PublicOverlay, Request};
10use tycho_types::models::ValidatorSet;
11use tycho_util::FastHashSet;
12use tycho_util::futures::JoinTask;
13use tycho_util::metrics::HistogramGuard;
14
15use crate::block_strider::{BlockSubscriber, BlockSubscriberContext};
16use crate::overlay_client::config::ValidatorsConfig;
17use crate::proto::overlay;
18
19pub trait ValidatorSetPeers {
20 fn get_peers(&self) -> FastHashSet<PeerId>;
21}
22
23impl ValidatorSetPeers for ValidatorSet {
24 fn get_peers(&self) -> FastHashSet<PeerId> {
25 self.list.iter().map(|x| PeerId(x.public_key.0)).collect()
26 }
27}
28
29impl<T: Borrow<PeerId>> ValidatorSetPeers for [T] {
30 fn get_peers(&self) -> FastHashSet<PeerId> {
31 self.iter().map(|x| *x.borrow()).collect()
32 }
33}
34
35impl<T: Borrow<PeerId>> ValidatorSetPeers for Vec<T> {
36 fn get_peers(&self) -> FastHashSet<PeerId> {
37 self.iter().map(|x| *x.borrow()).collect()
38 }
39}
40
41impl ValidatorSetPeers for FastHashSet<PeerId> {
42 fn get_peers(&self) -> FastHashSet<PeerId> {
43 self.clone()
44 }
45}
46
47#[derive(Clone)]
48pub struct ValidatorsResolver {
49 inner: Arc<Inner>,
50}
51
52impl ValidatorsResolver {
53 pub fn new(network: Network, overlay: PublicOverlay, config: ValidatorsConfig) -> Self {
54 let (peers_tx, peers_rx) = mpsc::unbounded_channel();
55
56 let peer_resolver = overlay.peer_resolver().clone();
57
58 let validators = Arc::new(Validators {
59 config,
60 resolved: Default::default(),
61 targets: Default::default(),
62 network,
63 overlay,
64 current_epoch: Default::default(),
65 target_validators_gauge: metrics::gauge!("tycho_core_overlay_client_target_validators"),
66 resolved_validators_gauge: metrics::gauge!(
67 "tycho_core_overlay_client_resolved_validators"
68 ),
69 });
70
71 let resolver_worker_handle = tokio::spawn({
72 let validators = validators.clone();
73 async move {
74 if let Some(peer_resolver) = peer_resolver {
75 validators.listen(peers_rx, peer_resolver).await;
76 }
77 }
78 });
79
80 Self {
81 inner: Arc::new(Inner {
82 validators,
83 peers_tx,
84
85 resolver_worker_handle: resolver_worker_handle.abort_handle(),
86 }),
87 }
88 }
89
90 pub fn update_validator_set<T: ValidatorSetPeers>(&self, vset: &T) {
91 let new_peers = vset.get_peers();
92 self.inner.peers_tx.send(new_peers).ok();
93 }
94
95 pub fn get_broadcast_targets(&self) -> Arc<Vec<Validator>> {
96 self.inner.validators.targets.load_full()
97 }
98}
99
100impl BlockSubscriber for ValidatorsResolver {
101 type Prepared = ();
102 type PrepareBlockFut<'a> = futures_util::future::Ready<anyhow::Result<()>>;
103 type HandleBlockFut<'a> = futures_util::future::Ready<anyhow::Result<()>>;
104
105 fn prepare_block<'a>(&'a self, _: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
106 futures_util::future::ready(Ok(()))
107 }
108
109 fn handle_block<'a>(
110 &'a self,
111 cx: &'a BlockSubscriberContext,
112 _: Self::Prepared,
113 ) -> Self::HandleBlockFut<'a> {
114 if !cx.is_key_block {
115 return futures_util::future::ready(Ok(()));
116 }
117 tracing::info!("updating validators");
118 let config = match cx.block.load_custom() {
119 Ok(extra) => &extra.config,
120 Err(e) => {
121 return futures_util::future::ready(Err(anyhow::anyhow!(
122 "failed to load mc block extra: {e:?}"
123 )));
124 }
125 };
126
127 if let Some(config) = config {
128 match config.get_current_validator_set() {
129 Ok(vset) => self.update_validator_set(&vset),
130 Err(e) => {
131 tracing::error!("failed to get validator set from blockchain config: {e:?}");
132 }
133 }
134 }
135
136 futures_util::future::ready(Ok(()))
137 }
138}
139
140struct Inner {
141 validators: Arc<Validators>,
142 peers_tx: PeersTx,
143
144 resolver_worker_handle: AbortHandle,
145}
146
147impl Drop for Inner {
148 fn drop(&mut self) {
149 tracing::info!("stopping validators resolver");
150 self.resolver_worker_handle.abort();
151 }
152}
153
154#[derive(Clone)]
155pub struct Validator {
156 inner: Arc<ValidatorInner>,
157}
158
159struct ValidatorInner {
160 handle: KnownPeerHandle,
161}
162
163impl Validator {
164 pub fn peer_id(&self) -> PeerId {
165 self.inner.handle.peer_info().id
166 }
167
168 pub fn is_expired(&self, now: u32) -> bool {
169 const NEW_THRESHOLD: u32 = 1800; let peer_info = self.inner.handle.peer_info();
172 let is_quite_old = peer_info.created_at + NEW_THRESHOLD < now;
173 is_quite_old || peer_info.expires_at < now
174 }
175}
176
177impl std::fmt::Debug for Validator {
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 f.debug_struct("Validator")
180 .field("peer_id", &self.peer_id())
181 .finish()
182 }
183}
184
185struct Validators {
186 config: ValidatorsConfig,
187
188 resolved: ArcSwap<Vec<Validator>>,
190
191 targets: ArcSwap<Vec<Validator>>,
193
194 network: Network,
195 overlay: PublicOverlay,
196
197 current_epoch: parking_lot::Mutex<usize>,
199
200 target_validators_gauge: metrics::Gauge,
201 resolved_validators_gauge: metrics::Gauge,
202}
203
204impl Validators {
205 #[tracing::instrument(name = "resolve_validators", skip_all)]
206 async fn listen(self: &Arc<Self>, mut peers_rx: PeersRx, peer_resolver: PeerResolver) {
207 tracing::info!("started");
208 scopeguard::defer! { tracing::info!("finished"); };
209
210 let mut current_peers = None;
211
212 let local_id = peer_resolver.dht_service().local_id();
213
214 loop {
215 tokio::select! {
216 maybe_peers = peers_rx.recv() => {
217 match maybe_peers {
218 Some(peers) => {
219 current_peers = Some(peers);
220 }
221 None => break,
222 }
223 }
224 _ = async {
225 let Some(mut peers) = current_peers.take() else {
226 futures_util::future::pending().await
227 };
228
229 let epoch = self.prepare_peers(&mut peers, local_id);
230
231 let this = self.clone();
233 let tracker_handle = JoinTask::new(async move {
234 this.track_resolved(epoch).await;
235 });
236
237 self.resolve(peers, &peer_resolver).await;
239
240 tracker_handle.await;
242 } => {}
243 }
244 }
245 }
246
247 fn prepare_peers(&self, peers: &mut FastHashSet<PeerId>, local_id: &PeerId) -> usize {
248 peers.remove(local_id);
250
251 metrics::gauge!("tycho_core_overlay_client_validators_to_resolve").set(peers.len() as f64);
252
253 let epoch = {
255 let mut current_epoch = self.current_epoch.lock();
256 *current_epoch += 1;
257 *current_epoch
258 };
259
260 tracing::debug!(epoch, ?peers, "preparing validators");
261
262 {
264 let targets = self.targets.load_full();
265 let mut changed = false;
267
268 let targets = targets
270 .iter()
271 .filter(|validator| {
272 let retain = peers.contains(&validator.inner.handle.peer_info().id);
274 tracing::debug!(id = %validator.peer_id(), ?retain, "filtering validator");
275
276 changed |= !retain;
277 retain
278 })
279 .cloned()
280 .collect::<Vec<_>>();
281
282 let count = targets.len();
283 if changed {
284 self.targets.store(Arc::new(targets));
285 }
286
287 self.target_validators_gauge.set(count as f64);
288 }
289
290 tracing::debug!(epoch, "prepared validators");
291
292 {
295 let resolved = self.resolved.load_full();
296
297 tracing::debug!(epoch, ?resolved, ?peers, "resolving validators");
298
299 let mut changed = false;
300 let resolved = resolved
301 .iter()
302 .filter(|validator| {
303 let retain = peers.remove(&validator.inner.handle.peer_info().id);
304 changed |= !retain;
305 retain
306 })
307 .cloned()
308 .collect::<Vec<_>>();
309
310 let count = resolved.len();
311
312 tracing::debug!(epoch, ?resolved, count, "resolved validators");
313 if changed {
314 self.resolved.store(Arc::new(resolved));
315 }
316
317 self.resolved_validators_gauge.set(count as f64);
318 }
319
320 epoch
322 }
323
324 async fn resolve(&self, peers: FastHashSet<PeerId>, peer_resolver: &PeerResolver) {
325 tracing::debug!(?peers, "started resolving validators");
326
327 let mut resolved = FuturesUnordered::new();
329 for peer_id in peers {
330 let peer = peer_resolver.insert(&peer_id, false);
331 resolved.push(async move { peer.wait_resolved().await });
332 }
333
334 while let Some(handle) = resolved.next().await {
335 let peer_id = handle.peer_info().id;
336 tracing::debug!(%peer_id, "resolved validator");
337
338 let mut resolved = self.resolved.load_full();
339 Arc::make_mut(&mut resolved).push(Validator {
340 inner: Arc::new(ValidatorInner { handle }),
341 });
342 let count = resolved.len();
343 self.resolved.store(resolved);
344 self.resolved_validators_gauge.set(count as f64);
345 }
346
347 tracing::debug!("resolved all validators");
348 }
349
350 #[tracing::instrument(skip(self))]
351 async fn track_resolved(&self, epoch: usize) {
352 use futures_util::StreamExt;
353 use rand::seq::SliceRandom;
354
355 tracing::debug!(epoch, "started resolving peers");
356 scopeguard::defer! {
357 tracing::debug!(epoch,"finished monitoring resolved validators");
358 }
359
360 let request = Request::from_tl(overlay::Ping);
361
362 let max_validators = self.config.keep;
363
364 let mut interval = tokio::time::interval(self.config.ping_interval);
365 loop {
366 interval.tick().await;
367
368 let mut resolved = Arc::unwrap_or_clone(self.resolved.load_full());
370
371 let now = tycho_util::time::now_sec();
373 resolved.retain(|validator| !validator.is_expired(now));
374
375 resolved.shuffle(&mut rand::rng());
377
378 let spawn_ping = |validator: Validator| {
379 let network = self.network.clone();
380 let overlay = self.overlay.clone();
381 let request = request.clone();
382 let ping_timeout = self.config.ping_timeout;
383
384 JoinTask::new(async move {
385 let _histogram =
386 HistogramGuard::begin("tycho_core_overlay_client_validator_ping_time");
387
388 let peer_id = validator.peer_id();
389 let res = tokio::time::timeout(
390 ping_timeout,
391 overlay.query(&network, &peer_id, request),
392 )
393 .await;
394
395 match res {
396 Ok(Ok(res)) => match res.parse_tl::<overlay::Pong>() {
397 Ok(_) => Some(validator),
398 Err(e) => {
399 tracing::debug!(%peer_id, "received an invalid ping response: {e}");
400 None
401 }
402 },
403 Ok(Err(e)) => {
404 tracing::debug!(%peer_id, "failed to ping validator: {e}");
405 None
406 }
407 Err(_) => {
408 tracing::debug!(%peer_id, "failed to ping validator: timeout");
409 None
410 }
411 }
412 })
413 };
414
415 let mut targets = Vec::with_capacity(max_validators);
416
417 let mut resolved = resolved.into_iter();
418
419 let mut futures = resolved
421 .by_ref()
422 .map(spawn_ping)
423 .take(max_validators)
424 .collect::<FuturesUnordered<_>>();
425
426 while let Some(res) = futures.next().await {
428 match res {
429 Some(validator) => {
431 targets.push(validator);
432 if targets.len() >= max_validators {
433 break;
434 }
435 }
436 None => match resolved.next() {
437 Some(validator) => futures.push(spawn_ping(validator)),
439 None => break,
441 },
442 }
443 }
444
445 let count = targets.len();
446
447 {
449 let current_epoch = self.current_epoch.lock();
450 if *current_epoch == epoch {
451 self.targets.store(Arc::new(targets));
453 } else {
454 return;
455 }
456 }
457
458 self.target_validators_gauge.set(count as f64);
459
460 tracing::info!(epoch, "updated current validators list");
462 }
463 }
464}
465
466type PeersTx = mpsc::UnboundedSender<FastHashSet<PeerId>>;
467type PeersRx = mpsc::UnboundedReceiver<FastHashSet<PeerId>>;