casper_node/
utils.rs

1//! Various functions that are not limited to a particular module, but are too small to warrant
2//! being factored out into standalone crates.
3
4mod block_signatures;
5pub(crate) mod chain_specification;
6pub(crate) mod config_specification;
7mod display_error;
8pub(crate) mod ds;
9mod external;
10pub(crate) mod fmt_limit;
11pub(crate) mod opt_display;
12#[cfg(target_os = "linux")]
13pub(crate) mod rlimit;
14pub(crate) mod round_robin;
15pub(crate) mod specimen;
16pub(crate) mod umask;
17pub mod work_queue;
18
19use std::{
20    fmt::{self, Debug, Display, Formatter},
21    io,
22    net::{SocketAddr, ToSocketAddrs},
23    ops::{Add, BitXorAssign, Div},
24    path::{Path, PathBuf},
25    sync::atomic::{AtomicBool, Ordering},
26    time::{Instant, SystemTime},
27};
28
29#[cfg(test)]
30use std::{any, sync::Arc, time::Duration};
31
32use datasize::DataSize;
33use hyper::server::{conn::AddrIncoming, Builder, Server};
34#[cfg(test)]
35use once_cell::sync::Lazy;
36use prometheus::{self, Histogram, HistogramOpts, Registry};
37use serde::Serialize;
38use thiserror::Error;
39use tracing::{error, warn};
40
41use crate::types::NodeId;
42pub(crate) use block_signatures::{check_sufficient_block_signatures, BlockSignatureError};
43pub(crate) use display_error::display_error;
44#[cfg(test)]
45pub(crate) use external::RESOURCES_PATH;
46pub use external::{External, LoadError, Loadable};
47pub(crate) use round_robin::WeightedRoundRobin;
48
49/// DNS resolution error.
50#[derive(Debug, Error)]
51#[error("could not resolve `{address}`: {kind}")]
52pub struct ResolveAddressError {
53    /// Address that failed to resolve.
54    address: String,
55    /// Reason for resolution failure.
56    kind: ResolveAddressErrorKind,
57}
58
59/// DNS resolution error kind.
60#[derive(Debug)]
61enum ResolveAddressErrorKind {
62    /// Resolve returned an error.
63    ErrorResolving(io::Error),
64    /// Resolution did not yield any address.
65    NoAddressFound,
66}
67
68impl Display for ResolveAddressErrorKind {
69    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
70        match self {
71            ResolveAddressErrorKind::ErrorResolving(err) => {
72                write!(f, "could not run dns resolution: {}", err)
73            }
74            ResolveAddressErrorKind::NoAddressFound => {
75                write!(f, "no addresses found")
76            }
77        }
78    }
79}
80
81/// Backport of `Result::flatten`, see <https://github.com/rust-lang/rust/issues/70142>.
82pub trait FlattenResult {
83    /// The output of the flattening operation.
84    type Output;
85
86    /// Flattens one level.
87    ///
88    /// This function is named `flatten_result` instead of `flatten` to avoid name collisions once
89    /// `Result::flatten` stabilizes.
90    fn flatten_result(self) -> Self::Output;
91}
92
93impl<T, E> FlattenResult for Result<Result<T, E>, E> {
94    type Output = Result<T, E>;
95
96    #[inline]
97    fn flatten_result(self) -> Self::Output {
98        match self {
99            Ok(Ok(v)) => Ok(v),
100            Ok(Err(e)) => Err(e),
101            Err(e) => Err(e),
102        }
103    }
104}
105
106/// Parses a network address from a string, with DNS resolution.
107pub(crate) fn resolve_address(address: &str) -> Result<SocketAddr, ResolveAddressError> {
108    address
109        .to_socket_addrs()
110        .map_err(|err| ResolveAddressError {
111            address: address.to_string(),
112            kind: ResolveAddressErrorKind::ErrorResolving(err),
113        })?
114        .next()
115        .ok_or_else(|| ResolveAddressError {
116            address: address.to_string(),
117            kind: ResolveAddressErrorKind::NoAddressFound,
118        })
119}
120
121/// An error starting one of the HTTP servers.
122#[derive(Debug, Error)]
123pub(crate) enum ListeningError {
124    /// Failed to resolve address.
125    #[error("failed to resolve network address: {0}")]
126    ResolveAddress(ResolveAddressError),
127
128    /// Failed to listen.
129    #[error("failed to listen on {address}: {error}")]
130    Listen {
131        /// The address attempted to listen on.
132        address: SocketAddr,
133        /// The failure reason.
134        error: Box<dyn std::error::Error + Send + Sync>,
135    },
136}
137
138pub(crate) fn start_listening(address: &str) -> Result<Builder<AddrIncoming>, ListeningError> {
139    let address = resolve_address(address).map_err(|error| {
140        warn!(%error, %address, "failed to start HTTP server, cannot parse address");
141        ListeningError::ResolveAddress(error)
142    })?;
143
144    Server::try_bind(&address).map_err(|error| {
145        warn!(%error, %address, "failed to start HTTP server");
146        ListeningError::Listen {
147            address,
148            error: Box::new(error),
149        }
150    })
151}
152
153/// Moves a value to the heap and then forgets about, leaving only a static reference behind.
154#[inline]
155pub(crate) fn leak<T>(value: T) -> &'static T {
156    Box::leak(Box::new(value))
157}
158
159/// A flag shared across multiple subsystem.
160#[derive(Copy, Clone, DataSize, Debug)]
161pub(crate) struct SharedFlag(&'static AtomicBool);
162
163impl SharedFlag {
164    /// Creates a new shared flag.
165    ///
166    /// The flag is initially not set.
167    pub(crate) fn new() -> Self {
168        SharedFlag(leak(AtomicBool::new(false)))
169    }
170
171    /// Checks whether the flag is set.
172    pub(crate) fn is_set(self) -> bool {
173        self.0.load(Ordering::SeqCst)
174    }
175
176    /// Set the flag.
177    pub(crate) fn set(self) {
178        self.0.store(true, Ordering::SeqCst);
179    }
180
181    /// Returns a shared instance of the flag for testing.
182    ///
183    /// The returned flag should **never** have `set` be called upon it.
184    #[cfg(test)]
185    pub(crate) fn global_shared() -> Self {
186        static SHARED_FLAG: Lazy<SharedFlag> = Lazy::new(SharedFlag::new);
187
188        *SHARED_FLAG
189    }
190}
191
192impl Default for SharedFlag {
193    fn default() -> Self {
194        Self::new()
195    }
196}
197
198/// With-directory context.
199///
200/// Associates a type with a "working directory".
201#[derive(Clone, DataSize, Debug)]
202pub struct WithDir<T> {
203    dir: PathBuf,
204    value: T,
205}
206
207impl<T> WithDir<T> {
208    /// Creates a new with-directory context.
209    pub fn new<P: Into<PathBuf>>(path: P, value: T) -> Self {
210        WithDir {
211            dir: path.into(),
212            value,
213        }
214    }
215
216    /// Returns a reference to the inner path.
217    pub fn dir(&self) -> &Path {
218        self.dir.as_ref()
219    }
220
221    /// Deconstructs a with-directory context.
222    pub(crate) fn into_parts(self) -> (PathBuf, T) {
223        (self.dir, self.value)
224    }
225
226    /// Maps an internal value onto a reference.
227    pub fn map_ref<U, F: FnOnce(&T) -> U>(&self, f: F) -> WithDir<U> {
228        WithDir {
229            dir: self.dir.clone(),
230            value: f(&self.value),
231        }
232    }
233
234    /// Get a reference to the inner value.
235    pub fn value(&self) -> &T {
236        &self.value
237    }
238
239    /// Get a mutable reference to the inner value.
240    pub fn value_mut(&mut self) -> &mut T {
241        &mut self.value
242    }
243
244    /// Adds `self.dir` as a parent if `path` is relative, otherwise returns `path` unchanged.
245    pub fn with_dir(&self, path: PathBuf) -> PathBuf {
246        if path.is_relative() {
247            self.dir.join(path)
248        } else {
249            path
250        }
251    }
252}
253
254/// The source of a piece of data.
255#[derive(Clone, Debug, Serialize)]
256pub(crate) enum Source {
257    /// A peer with the wrapped ID.
258    PeerGossiped(NodeId),
259    /// A peer with the wrapped ID.
260    Peer(NodeId),
261    /// A client.
262    Client,
263    /// A client via the speculative_exec server.
264    SpeculativeExec,
265    /// This node.
266    Ourself,
267}
268
269impl Source {
270    #[allow(clippy::wrong_self_convention)]
271    pub(crate) fn is_client(&self) -> bool {
272        match self {
273            Source::Client | Source::SpeculativeExec => true,
274            Source::PeerGossiped(_) | Source::Peer(_) | Source::Ourself => false,
275        }
276    }
277
278    /// If `self` represents a peer, returns its ID, otherwise returns `None`.
279    pub(crate) fn node_id(&self) -> Option<NodeId> {
280        match self {
281            Source::Peer(node_id) | Source::PeerGossiped(node_id) => Some(*node_id),
282            Source::Client | Source::SpeculativeExec | Source::Ourself => None,
283        }
284    }
285}
286
287impl Display for Source {
288    fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
289        match self {
290            Source::PeerGossiped(node_id) | Source::Peer(node_id) => {
291                Display::fmt(node_id, formatter)
292            }
293            Source::Client => write!(formatter, "client"),
294            Source::SpeculativeExec => write!(formatter, "client (speculative exec)"),
295            Source::Ourself => write!(formatter, "ourself"),
296        }
297    }
298}
299
300/// Divides `numerator` by `denominator` and rounds to the closest integer (round half down).
301///
302/// `numerator + denominator / 2` must not overflow, and `denominator` must not be zero.
303pub(crate) fn div_round<T>(numerator: T, denominator: T) -> T
304where
305    T: Add<Output = T> + Div<Output = T> + From<u8> + Copy,
306{
307    (numerator + denominator / T::from(2)) / denominator
308}
309
310/// Creates a prometheus Histogram and registers it.
311pub(crate) fn register_histogram_metric(
312    registry: &Registry,
313    metric_name: &str,
314    metric_help: &str,
315    buckets: Vec<f64>,
316) -> Result<Histogram, prometheus::Error> {
317    let histogram_opts = HistogramOpts::new(metric_name, metric_help).buckets(buckets);
318    let histogram = Histogram::with_opts(histogram_opts)?;
319    registry.register(Box::new(histogram.clone()))?;
320    Ok(histogram)
321}
322
323/// Unregisters a metric from the Prometheus registry.
324#[macro_export]
325macro_rules! unregister_metric {
326    ($registry:expr, $metric:expr) => {
327        $registry
328            .unregister(Box::new($metric.clone()))
329            .unwrap_or_else(|_| {
330                tracing::error!(
331                    "unregistering {} failed: was not registered",
332                    stringify!($metric)
333                )
334            });
335    };
336}
337
338/// XORs two byte sequences.
339///
340/// # Panics
341///
342/// Panics if `lhs` and `rhs` are not of equal length.
343#[inline]
344pub(crate) fn xor(lhs: &mut [u8], rhs: &[u8]) {
345    // Implementing SIMD support is left as an exercise for the reader.
346    assert_eq!(lhs.len(), rhs.len(), "xor inputs should have equal length");
347    lhs.iter_mut()
348        .zip(rhs.iter())
349        .for_each(|(sb, &cb)| sb.bitxor_assign(cb));
350}
351
352/// Wait until all strong references for a particular arc have been dropped.
353///
354/// Downgrades and immediately drops the `Arc`, keeping only a weak reference. The reference will
355/// then be polled `attempts` times, unless it has a strong reference count of 0.
356///
357/// Returns whether or not `arc` has zero strong references left.
358///
359/// # Note
360///
361/// Using this function is usually a potential architectural issue and it should be used very
362/// sparingly. Consider introducing a different access pattern for the value under `Arc`.
363#[cfg(test)]
364pub(crate) async fn wait_for_arc_drop<T>(
365    arc: Arc<T>,
366    attempts: usize,
367    retry_delay: Duration,
368) -> bool {
369    // Ensure that if we do hold the last reference, we are now going to 0.
370    let weak = Arc::downgrade(&arc);
371    drop(arc);
372
373    for _ in 0..attempts {
374        let strong_count = weak.strong_count();
375
376        if strong_count == 0 {
377            // Everything has been dropped, we are done.
378            return true;
379        }
380
381        tokio::time::sleep(retry_delay).await;
382    }
383
384    error!(
385        attempts, ?retry_delay, ty=%any::type_name::<T>(),
386        "failed to clean up shared reference"
387    );
388
389    false
390}
391
392/// An anchor for converting an `Instant` into a wall-clock (`SystemTime`) time.
393#[derive(Copy, Clone, Debug)]
394pub(crate) struct TimeAnchor {
395    /// The reference instant used for conversion.
396    now: Instant,
397    /// The reference wall-clock timestamp used for conversion.
398    wall_clock_now: SystemTime,
399}
400
401impl TimeAnchor {
402    /// Creates a new time anchor.
403    ///
404    /// Will take a sample of the monotonic clock and the current time and store it in the anchor.
405    pub(crate) fn now() -> Self {
406        TimeAnchor {
407            now: Instant::now(),
408            wall_clock_now: SystemTime::now(),
409        }
410    }
411
412    /// Converts a point in time from the monotonic clock to wall clock time, using this anchor.
413    #[inline]
414    pub(crate) fn convert(&self, then: Instant) -> SystemTime {
415        if then > self.now {
416            self.wall_clock_now + then.duration_since(self.now)
417        } else {
418            self.wall_clock_now - self.now.duration_since(then)
419        }
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use std::{sync::Arc, time::Duration};
426
427    use crate::utils::SharedFlag;
428
429    use super::{wait_for_arc_drop, xor};
430
431    #[test]
432    fn xor_works() {
433        let mut lhs = [0x43, 0x53, 0xf2, 0x2f, 0xa9, 0x70, 0xfb, 0xf4];
434        let rhs = [0x04, 0x0b, 0x5c, 0xa1, 0xef, 0x11, 0x12, 0x23];
435        let xor_result = [0x47, 0x58, 0xae, 0x8e, 0x46, 0x61, 0xe9, 0xd7];
436
437        xor(&mut lhs, &rhs);
438
439        assert_eq!(lhs, xor_result);
440    }
441
442    #[test]
443    #[should_panic(expected = "equal length")]
444    fn xor_panics_on_uneven_inputs() {
445        let mut lhs = [0x43, 0x53, 0xf2, 0x2f, 0xa9, 0x70, 0xfb, 0xf4];
446        let rhs = [0x04, 0x0b, 0x5c, 0xa1, 0xef, 0x11];
447
448        xor(&mut lhs, &rhs);
449    }
450
451    #[tokio::test]
452    async fn arc_drop_waits_for_drop() {
453        let retry_delay = Duration::from_millis(25);
454        let attempts = 15;
455
456        let arc = Arc::new(());
457
458        let arc_in_background = arc.clone();
459        let _weak_in_background = Arc::downgrade(&arc);
460
461        // At this point, the Arc has the following refernces:
462        //
463        // * main test task (`arc`, strong)
464        // * background strong reference (`arc_in_background`)
465        // * background weak reference (`weak_in_background`)
466
467        // Phase 1: waiting for the arc should fail, because there still is the background
468        // reference.
469        assert!(!wait_for_arc_drop(arc, attempts, retry_delay).await);
470
471        // We "restore" the arc from the background arc.
472        let arc = arc_in_background.clone();
473
474        // Add another "foreground" weak reference.
475        let weak = Arc::downgrade(&arc);
476
477        // Phase 2: Our background tasks drops its reference, now we should succeed.
478        drop(arc_in_background);
479        assert!(wait_for_arc_drop(arc, attempts, retry_delay).await);
480
481        // Immedetialy after, we should not be able to obtain a strong reference anymore.
482        // This test fails only if we have a race condition, so false positive tests are possible.
483        assert!(weak.upgrade().is_none());
484    }
485
486    #[test]
487    fn shared_flag_sanity_check() {
488        let flag = SharedFlag::new();
489        let copied = flag;
490
491        assert!(!flag.is_set());
492        assert!(!copied.is_set());
493        assert!(!flag.is_set());
494        assert!(!copied.is_set());
495
496        flag.set();
497
498        assert!(flag.is_set());
499        assert!(copied.is_set());
500        assert!(flag.is_set());
501        assert!(copied.is_set());
502    }
503}