1mod block_signatures;
5pub(crate) mod chain_specification;
6pub(crate) mod config_specification;
7mod display_error;
8pub(crate) mod ds;
9mod external;
10pub(crate) mod fmt_limit;
11pub(crate) mod opt_display;
12#[cfg(target_os = "linux")]
13pub(crate) mod rlimit;
14pub(crate) mod round_robin;
15pub(crate) mod specimen;
16pub(crate) mod umask;
17pub mod work_queue;
18
19use std::{
20 fmt::{self, Debug, Display, Formatter},
21 io,
22 net::{SocketAddr, ToSocketAddrs},
23 ops::{Add, BitXorAssign, Div},
24 path::{Path, PathBuf},
25 sync::atomic::{AtomicBool, Ordering},
26 time::{Instant, SystemTime},
27};
28
29#[cfg(test)]
30use std::{any, sync::Arc, time::Duration};
31
32use datasize::DataSize;
33use hyper::server::{conn::AddrIncoming, Builder, Server};
34#[cfg(test)]
35use once_cell::sync::Lazy;
36use prometheus::{self, Histogram, HistogramOpts, Registry};
37use serde::Serialize;
38use thiserror::Error;
39use tracing::{error, warn};
40
41use crate::types::NodeId;
42pub(crate) use block_signatures::{check_sufficient_block_signatures, BlockSignatureError};
43pub(crate) use display_error::display_error;
44#[cfg(test)]
45pub(crate) use external::RESOURCES_PATH;
46pub use external::{External, LoadError, Loadable};
47pub(crate) use round_robin::WeightedRoundRobin;
48
49#[derive(Debug, Error)]
51#[error("could not resolve `{address}`: {kind}")]
52pub struct ResolveAddressError {
53 address: String,
55 kind: ResolveAddressErrorKind,
57}
58
59#[derive(Debug)]
61enum ResolveAddressErrorKind {
62 ErrorResolving(io::Error),
64 NoAddressFound,
66}
67
68impl Display for ResolveAddressErrorKind {
69 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
70 match self {
71 ResolveAddressErrorKind::ErrorResolving(err) => {
72 write!(f, "could not run dns resolution: {}", err)
73 }
74 ResolveAddressErrorKind::NoAddressFound => {
75 write!(f, "no addresses found")
76 }
77 }
78 }
79}
80
81pub trait FlattenResult {
83 type Output;
85
86 fn flatten_result(self) -> Self::Output;
91}
92
93impl<T, E> FlattenResult for Result<Result<T, E>, E> {
94 type Output = Result<T, E>;
95
96 #[inline]
97 fn flatten_result(self) -> Self::Output {
98 match self {
99 Ok(Ok(v)) => Ok(v),
100 Ok(Err(e)) => Err(e),
101 Err(e) => Err(e),
102 }
103 }
104}
105
106pub(crate) fn resolve_address(address: &str) -> Result<SocketAddr, ResolveAddressError> {
108 address
109 .to_socket_addrs()
110 .map_err(|err| ResolveAddressError {
111 address: address.to_string(),
112 kind: ResolveAddressErrorKind::ErrorResolving(err),
113 })?
114 .next()
115 .ok_or_else(|| ResolveAddressError {
116 address: address.to_string(),
117 kind: ResolveAddressErrorKind::NoAddressFound,
118 })
119}
120
121#[derive(Debug, Error)]
123pub(crate) enum ListeningError {
124 #[error("failed to resolve network address: {0}")]
126 ResolveAddress(ResolveAddressError),
127
128 #[error("failed to listen on {address}: {error}")]
130 Listen {
131 address: SocketAddr,
133 error: Box<dyn std::error::Error + Send + Sync>,
135 },
136}
137
138pub(crate) fn start_listening(address: &str) -> Result<Builder<AddrIncoming>, ListeningError> {
139 let address = resolve_address(address).map_err(|error| {
140 warn!(%error, %address, "failed to start HTTP server, cannot parse address");
141 ListeningError::ResolveAddress(error)
142 })?;
143
144 Server::try_bind(&address).map_err(|error| {
145 warn!(%error, %address, "failed to start HTTP server");
146 ListeningError::Listen {
147 address,
148 error: Box::new(error),
149 }
150 })
151}
152
153#[inline]
155pub(crate) fn leak<T>(value: T) -> &'static T {
156 Box::leak(Box::new(value))
157}
158
159#[derive(Copy, Clone, DataSize, Debug)]
161pub(crate) struct SharedFlag(&'static AtomicBool);
162
163impl SharedFlag {
164 pub(crate) fn new() -> Self {
168 SharedFlag(leak(AtomicBool::new(false)))
169 }
170
171 pub(crate) fn is_set(self) -> bool {
173 self.0.load(Ordering::SeqCst)
174 }
175
176 pub(crate) fn set(self) {
178 self.0.store(true, Ordering::SeqCst);
179 }
180
181 #[cfg(test)]
185 pub(crate) fn global_shared() -> Self {
186 static SHARED_FLAG: Lazy<SharedFlag> = Lazy::new(SharedFlag::new);
187
188 *SHARED_FLAG
189 }
190}
191
192impl Default for SharedFlag {
193 fn default() -> Self {
194 Self::new()
195 }
196}
197
198#[derive(Clone, DataSize, Debug)]
202pub struct WithDir<T> {
203 dir: PathBuf,
204 value: T,
205}
206
207impl<T> WithDir<T> {
208 pub fn new<P: Into<PathBuf>>(path: P, value: T) -> Self {
210 WithDir {
211 dir: path.into(),
212 value,
213 }
214 }
215
216 pub fn dir(&self) -> &Path {
218 self.dir.as_ref()
219 }
220
221 pub(crate) fn into_parts(self) -> (PathBuf, T) {
223 (self.dir, self.value)
224 }
225
226 pub fn map_ref<U, F: FnOnce(&T) -> U>(&self, f: F) -> WithDir<U> {
228 WithDir {
229 dir: self.dir.clone(),
230 value: f(&self.value),
231 }
232 }
233
234 pub fn value(&self) -> &T {
236 &self.value
237 }
238
239 pub fn value_mut(&mut self) -> &mut T {
241 &mut self.value
242 }
243
244 pub fn with_dir(&self, path: PathBuf) -> PathBuf {
246 if path.is_relative() {
247 self.dir.join(path)
248 } else {
249 path
250 }
251 }
252}
253
254#[derive(Clone, Debug, Serialize)]
256pub(crate) enum Source {
257 PeerGossiped(NodeId),
259 Peer(NodeId),
261 Client,
263 SpeculativeExec,
265 Ourself,
267}
268
269impl Source {
270 #[allow(clippy::wrong_self_convention)]
271 pub(crate) fn is_client(&self) -> bool {
272 match self {
273 Source::Client | Source::SpeculativeExec => true,
274 Source::PeerGossiped(_) | Source::Peer(_) | Source::Ourself => false,
275 }
276 }
277
278 pub(crate) fn node_id(&self) -> Option<NodeId> {
280 match self {
281 Source::Peer(node_id) | Source::PeerGossiped(node_id) => Some(*node_id),
282 Source::Client | Source::SpeculativeExec | Source::Ourself => None,
283 }
284 }
285}
286
287impl Display for Source {
288 fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
289 match self {
290 Source::PeerGossiped(node_id) | Source::Peer(node_id) => {
291 Display::fmt(node_id, formatter)
292 }
293 Source::Client => write!(formatter, "client"),
294 Source::SpeculativeExec => write!(formatter, "client (speculative exec)"),
295 Source::Ourself => write!(formatter, "ourself"),
296 }
297 }
298}
299
300pub(crate) fn div_round<T>(numerator: T, denominator: T) -> T
304where
305 T: Add<Output = T> + Div<Output = T> + From<u8> + Copy,
306{
307 (numerator + denominator / T::from(2)) / denominator
308}
309
310pub(crate) fn register_histogram_metric(
312 registry: &Registry,
313 metric_name: &str,
314 metric_help: &str,
315 buckets: Vec<f64>,
316) -> Result<Histogram, prometheus::Error> {
317 let histogram_opts = HistogramOpts::new(metric_name, metric_help).buckets(buckets);
318 let histogram = Histogram::with_opts(histogram_opts)?;
319 registry.register(Box::new(histogram.clone()))?;
320 Ok(histogram)
321}
322
323#[macro_export]
325macro_rules! unregister_metric {
326 ($registry:expr, $metric:expr) => {
327 $registry
328 .unregister(Box::new($metric.clone()))
329 .unwrap_or_else(|_| {
330 tracing::error!(
331 "unregistering {} failed: was not registered",
332 stringify!($metric)
333 )
334 });
335 };
336}
337
338#[inline]
344pub(crate) fn xor(lhs: &mut [u8], rhs: &[u8]) {
345 assert_eq!(lhs.len(), rhs.len(), "xor inputs should have equal length");
347 lhs.iter_mut()
348 .zip(rhs.iter())
349 .for_each(|(sb, &cb)| sb.bitxor_assign(cb));
350}
351
352#[cfg(test)]
364pub(crate) async fn wait_for_arc_drop<T>(
365 arc: Arc<T>,
366 attempts: usize,
367 retry_delay: Duration,
368) -> bool {
369 let weak = Arc::downgrade(&arc);
371 drop(arc);
372
373 for _ in 0..attempts {
374 let strong_count = weak.strong_count();
375
376 if strong_count == 0 {
377 return true;
379 }
380
381 tokio::time::sleep(retry_delay).await;
382 }
383
384 error!(
385 attempts, ?retry_delay, ty=%any::type_name::<T>(),
386 "failed to clean up shared reference"
387 );
388
389 false
390}
391
392#[derive(Copy, Clone, Debug)]
394pub(crate) struct TimeAnchor {
395 now: Instant,
397 wall_clock_now: SystemTime,
399}
400
401impl TimeAnchor {
402 pub(crate) fn now() -> Self {
406 TimeAnchor {
407 now: Instant::now(),
408 wall_clock_now: SystemTime::now(),
409 }
410 }
411
412 #[inline]
414 pub(crate) fn convert(&self, then: Instant) -> SystemTime {
415 if then > self.now {
416 self.wall_clock_now + then.duration_since(self.now)
417 } else {
418 self.wall_clock_now - self.now.duration_since(then)
419 }
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use std::{sync::Arc, time::Duration};
426
427 use crate::utils::SharedFlag;
428
429 use super::{wait_for_arc_drop, xor};
430
431 #[test]
432 fn xor_works() {
433 let mut lhs = [0x43, 0x53, 0xf2, 0x2f, 0xa9, 0x70, 0xfb, 0xf4];
434 let rhs = [0x04, 0x0b, 0x5c, 0xa1, 0xef, 0x11, 0x12, 0x23];
435 let xor_result = [0x47, 0x58, 0xae, 0x8e, 0x46, 0x61, 0xe9, 0xd7];
436
437 xor(&mut lhs, &rhs);
438
439 assert_eq!(lhs, xor_result);
440 }
441
442 #[test]
443 #[should_panic(expected = "equal length")]
444 fn xor_panics_on_uneven_inputs() {
445 let mut lhs = [0x43, 0x53, 0xf2, 0x2f, 0xa9, 0x70, 0xfb, 0xf4];
446 let rhs = [0x04, 0x0b, 0x5c, 0xa1, 0xef, 0x11];
447
448 xor(&mut lhs, &rhs);
449 }
450
451 #[tokio::test]
452 async fn arc_drop_waits_for_drop() {
453 let retry_delay = Duration::from_millis(25);
454 let attempts = 15;
455
456 let arc = Arc::new(());
457
458 let arc_in_background = arc.clone();
459 let _weak_in_background = Arc::downgrade(&arc);
460
461 assert!(!wait_for_arc_drop(arc, attempts, retry_delay).await);
470
471 let arc = arc_in_background.clone();
473
474 let weak = Arc::downgrade(&arc);
476
477 drop(arc_in_background);
479 assert!(wait_for_arc_drop(arc, attempts, retry_delay).await);
480
481 assert!(weak.upgrade().is_none());
484 }
485
486 #[test]
487 fn shared_flag_sanity_check() {
488 let flag = SharedFlag::new();
489 let copied = flag;
490
491 assert!(!flag.is_set());
492 assert!(!copied.is_set());
493 assert!(!flag.is_set());
494 assert!(!copied.is_set());
495
496 flag.set();
497
498 assert!(flag.is_set());
499 assert!(copied.is_set());
500 assert!(flag.is_set());
501 assert!(copied.is_set());
502 }
503}