mod block_signatures;
pub(crate) mod chain_specification;
pub(crate) mod config_specification;
mod display_error;
pub(crate) mod ds;
mod external;
pub(crate) mod fmt_limit;
pub(crate) mod opt_display;
#[cfg(target_os = "linux")]
pub(crate) mod rlimit;
pub(crate) mod round_robin;
pub(crate) mod specimen;
pub(crate) mod umask;
pub mod work_queue;
use std::{
fmt::{self, Debug, Display, Formatter},
io,
net::{SocketAddr, ToSocketAddrs},
ops::{Add, BitXorAssign, Div},
path::{Path, PathBuf},
sync::atomic::{AtomicBool, Ordering},
time::{Instant, SystemTime},
};
#[cfg(test)]
use std::{any, sync::Arc, time::Duration};
use datasize::DataSize;
use hyper::server::{conn::AddrIncoming, Builder, Server};
#[cfg(test)]
use once_cell::sync::Lazy;
use prometheus::{self, Histogram, HistogramOpts, Registry};
use serde::Serialize;
use thiserror::Error;
use tracing::{error, warn};
use crate::types::NodeId;
pub(crate) use block_signatures::{check_sufficient_block_signatures, BlockSignatureError};
pub(crate) use display_error::display_error;
#[cfg(test)]
pub(crate) use external::RESOURCES_PATH;
pub use external::{External, LoadError, Loadable};
pub(crate) use round_robin::WeightedRoundRobin;
#[derive(Debug, Error)]
#[error("could not resolve `{address}`: {kind}")]
pub struct ResolveAddressError {
address: String,
kind: ResolveAddressErrorKind,
}
#[derive(Debug)]
enum ResolveAddressErrorKind {
ErrorResolving(io::Error),
NoAddressFound,
}
impl Display for ResolveAddressErrorKind {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
ResolveAddressErrorKind::ErrorResolving(err) => {
write!(f, "could not run dns resolution: {}", err)
}
ResolveAddressErrorKind::NoAddressFound => {
write!(f, "no addresses found")
}
}
}
}
pub trait FlattenResult {
type Output;
fn flatten_result(self) -> Self::Output;
}
impl<T, E> FlattenResult for Result<Result<T, E>, E> {
type Output = Result<T, E>;
#[inline]
fn flatten_result(self) -> Self::Output {
match self {
Ok(Ok(v)) => Ok(v),
Ok(Err(e)) => Err(e),
Err(e) => Err(e),
}
}
}
pub(crate) fn resolve_address(address: &str) -> Result<SocketAddr, ResolveAddressError> {
address
.to_socket_addrs()
.map_err(|err| ResolveAddressError {
address: address.to_string(),
kind: ResolveAddressErrorKind::ErrorResolving(err),
})?
.next()
.ok_or_else(|| ResolveAddressError {
address: address.to_string(),
kind: ResolveAddressErrorKind::NoAddressFound,
})
}
#[derive(Debug, Error)]
pub(crate) enum ListeningError {
#[error("failed to resolve network address: {0}")]
ResolveAddress(ResolveAddressError),
#[error("failed to listen on {address}: {error}")]
Listen {
address: SocketAddr,
error: Box<dyn std::error::Error + Send + Sync>,
},
}
pub(crate) fn start_listening(address: &str) -> Result<Builder<AddrIncoming>, ListeningError> {
let address = resolve_address(address).map_err(|error| {
warn!(%error, %address, "failed to start HTTP server, cannot parse address");
ListeningError::ResolveAddress(error)
})?;
Server::try_bind(&address).map_err(|error| {
warn!(%error, %address, "failed to start HTTP server");
ListeningError::Listen {
address,
error: Box::new(error),
}
})
}
#[inline]
pub(crate) fn leak<T>(value: T) -> &'static T {
Box::leak(Box::new(value))
}
#[derive(Copy, Clone, DataSize, Debug)]
pub(crate) struct SharedFlag(&'static AtomicBool);
impl SharedFlag {
pub(crate) fn new() -> Self {
SharedFlag(leak(AtomicBool::new(false)))
}
pub(crate) fn is_set(self) -> bool {
self.0.load(Ordering::SeqCst)
}
pub(crate) fn set(self) {
self.0.store(true, Ordering::SeqCst);
}
#[cfg(test)]
pub(crate) fn global_shared() -> Self {
static SHARED_FLAG: Lazy<SharedFlag> = Lazy::new(SharedFlag::new);
*SHARED_FLAG
}
}
impl Default for SharedFlag {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, DataSize, Debug)]
pub struct WithDir<T> {
dir: PathBuf,
value: T,
}
impl<T> WithDir<T> {
pub fn new<P: Into<PathBuf>>(path: P, value: T) -> Self {
WithDir {
dir: path.into(),
value,
}
}
pub fn dir(&self) -> &Path {
self.dir.as_ref()
}
pub(crate) fn into_parts(self) -> (PathBuf, T) {
(self.dir, self.value)
}
pub fn map_ref<U, F: FnOnce(&T) -> U>(&self, f: F) -> WithDir<U> {
WithDir {
dir: self.dir.clone(),
value: f(&self.value),
}
}
pub fn value(&self) -> &T {
&self.value
}
pub fn value_mut(&mut self) -> &mut T {
&mut self.value
}
pub fn with_dir(&self, path: PathBuf) -> PathBuf {
if path.is_relative() {
self.dir.join(path)
} else {
path
}
}
}
#[derive(Clone, Debug, Serialize)]
pub(crate) enum Source {
PeerGossiped(NodeId),
Peer(NodeId),
Client,
SpeculativeExec,
Ourself,
}
impl Source {
#[allow(clippy::wrong_self_convention)]
pub(crate) fn is_client(&self) -> bool {
match self {
Source::Client | Source::SpeculativeExec => true,
Source::PeerGossiped(_) | Source::Peer(_) | Source::Ourself => false,
}
}
pub(crate) fn node_id(&self) -> Option<NodeId> {
match self {
Source::Peer(node_id) | Source::PeerGossiped(node_id) => Some(*node_id),
Source::Client | Source::SpeculativeExec | Source::Ourself => None,
}
}
}
impl Display for Source {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
match self {
Source::PeerGossiped(node_id) | Source::Peer(node_id) => {
Display::fmt(node_id, formatter)
}
Source::Client => write!(formatter, "client"),
Source::SpeculativeExec => write!(formatter, "client (speculative exec)"),
Source::Ourself => write!(formatter, "ourself"),
}
}
}
pub(crate) fn div_round<T>(numerator: T, denominator: T) -> T
where
T: Add<Output = T> + Div<Output = T> + From<u8> + Copy,
{
(numerator + denominator / T::from(2)) / denominator
}
pub(crate) fn register_histogram_metric(
registry: &Registry,
metric_name: &str,
metric_help: &str,
buckets: Vec<f64>,
) -> Result<Histogram, prometheus::Error> {
let histogram_opts = HistogramOpts::new(metric_name, metric_help).buckets(buckets);
let histogram = Histogram::with_opts(histogram_opts)?;
registry.register(Box::new(histogram.clone()))?;
Ok(histogram)
}
#[macro_export]
macro_rules! unregister_metric {
($registry:expr, $metric:expr) => {
$registry
.unregister(Box::new($metric.clone()))
.unwrap_or_else(|_| {
tracing::error!(
"unregistering {} failed: was not registered",
stringify!($metric)
)
});
};
}
#[inline]
pub(crate) fn xor(lhs: &mut [u8], rhs: &[u8]) {
assert_eq!(lhs.len(), rhs.len(), "xor inputs should have equal length");
lhs.iter_mut()
.zip(rhs.iter())
.for_each(|(sb, &cb)| sb.bitxor_assign(cb));
}
#[cfg(test)]
pub(crate) async fn wait_for_arc_drop<T>(
arc: Arc<T>,
attempts: usize,
retry_delay: Duration,
) -> bool {
let weak = Arc::downgrade(&arc);
drop(arc);
for _ in 0..attempts {
let strong_count = weak.strong_count();
if strong_count == 0 {
return true;
}
tokio::time::sleep(retry_delay).await;
}
error!(
attempts, ?retry_delay, ty=%any::type_name::<T>(),
"failed to clean up shared reference"
);
false
}
#[derive(Copy, Clone, Debug)]
pub(crate) struct TimeAnchor {
now: Instant,
wall_clock_now: SystemTime,
}
impl TimeAnchor {
pub(crate) fn now() -> Self {
TimeAnchor {
now: Instant::now(),
wall_clock_now: SystemTime::now(),
}
}
#[inline]
pub(crate) fn convert(&self, then: Instant) -> SystemTime {
if then > self.now {
self.wall_clock_now + then.duration_since(self.now)
} else {
self.wall_clock_now - self.now.duration_since(then)
}
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use crate::utils::SharedFlag;
use super::{wait_for_arc_drop, xor};
#[test]
fn xor_works() {
let mut lhs = [0x43, 0x53, 0xf2, 0x2f, 0xa9, 0x70, 0xfb, 0xf4];
let rhs = [0x04, 0x0b, 0x5c, 0xa1, 0xef, 0x11, 0x12, 0x23];
let xor_result = [0x47, 0x58, 0xae, 0x8e, 0x46, 0x61, 0xe9, 0xd7];
xor(&mut lhs, &rhs);
assert_eq!(lhs, xor_result);
}
#[test]
#[should_panic(expected = "equal length")]
fn xor_panics_on_uneven_inputs() {
let mut lhs = [0x43, 0x53, 0xf2, 0x2f, 0xa9, 0x70, 0xfb, 0xf4];
let rhs = [0x04, 0x0b, 0x5c, 0xa1, 0xef, 0x11];
xor(&mut lhs, &rhs);
}
#[tokio::test]
async fn arc_drop_waits_for_drop() {
let retry_delay = Duration::from_millis(25);
let attempts = 15;
let arc = Arc::new(());
let arc_in_background = arc.clone();
let _weak_in_background = Arc::downgrade(&arc);
assert!(!wait_for_arc_drop(arc, attempts, retry_delay).await);
let arc = arc_in_background.clone();
let weak = Arc::downgrade(&arc);
drop(arc_in_background);
assert!(wait_for_arc_drop(arc, attempts, retry_delay).await);
assert!(weak.upgrade().is_none());
}
#[test]
fn shared_flag_sanity_check() {
let flag = SharedFlag::new();
let copied = flag;
assert!(!flag.is_set());
assert!(!copied.is_set());
assert!(!flag.is_set());
assert!(!copied.is_set());
flag.set();
assert!(flag.is_set());
assert!(copied.is_set());
assert!(flag.is_set());
assert!(copied.is_set());
}
}