mod display_error;
pub(crate) mod ds;
mod external;
pub(crate) mod pid_file;
#[cfg(target_os = "linux")]
pub(crate) mod rlimit;
mod round_robin;
use std::{
any,
cell::RefCell,
fmt::{self, Debug, Display, Formatter},
fs,
io::{self, Write},
net::{SocketAddr, ToSocketAddrs},
ops::{Add, BitXorAssign, Div},
os::unix::fs::OpenOptionsExt,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use datasize::DataSize;
use hyper::server::{conn::AddrIncoming, Builder, Server};
#[cfg(test)]
use once_cell::sync::Lazy;
use prometheus::{self, Histogram, HistogramOpts, Registry};
use serde::Serialize;
use thiserror::Error;
use tracing::{error, warn};
pub(crate) use display_error::display_error;
#[cfg(test)]
pub(crate) use external::RESOURCES_PATH;
pub(crate) use external::{External, LoadError, Loadable};
pub(crate) use round_robin::WeightedRoundRobin;
#[derive(Debug, Error)]
#[error("could not resolve `{address}`: {kind}")]
pub struct ResolveAddressError {
address: String,
kind: ResolveAddressErrorKind,
}
#[derive(Debug)]
enum ResolveAddressErrorKind {
ErrorResolving(io::Error),
NoAddressFound,
}
impl Display for ResolveAddressErrorKind {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
ResolveAddressErrorKind::ErrorResolving(err) => {
write!(f, "could not run dns resolution: {}", err)
}
ResolveAddressErrorKind::NoAddressFound => {
write!(f, "no addresses found")
}
}
}
}
pub(crate) fn resolve_address(address: &str) -> Result<SocketAddr, ResolveAddressError> {
address
.to_socket_addrs()
.map_err(|err| ResolveAddressError {
address: address.to_string(),
kind: ResolveAddressErrorKind::ErrorResolving(err),
})?
.next()
.ok_or_else(|| ResolveAddressError {
address: address.to_string(),
kind: ResolveAddressErrorKind::NoAddressFound,
})
}
#[derive(Debug, Error)]
pub(crate) enum ListeningError {
#[error("failed to resolve network address: {0}")]
ResolveAddress(ResolveAddressError),
#[error("failed to listen on {address}: {error}")]
Listen {
address: SocketAddr,
error: Box<dyn std::error::Error + Send + Sync>,
},
}
pub(crate) fn start_listening(address: &str) -> Result<Builder<AddrIncoming>, ListeningError> {
let address = resolve_address(address).map_err(|error| {
warn!(%error, %address, "failed to start HTTP server, cannot parse address");
ListeningError::ResolveAddress(error)
})?;
Server::try_bind(&address).map_err(|error| {
warn!(%error, %address, "failed to start HTTP server");
ListeningError::Listen {
address,
error: Box::new(error),
}
})
}
#[inline]
pub(crate) fn leak<T>(value: T) -> &'static T {
Box::leak(Box::new(value))
}
#[derive(Copy, Clone, DataSize, Debug)]
pub(crate) struct SharedFlag(&'static AtomicBool);
impl SharedFlag {
pub(crate) fn new() -> Self {
SharedFlag(leak(AtomicBool::new(false)))
}
pub(crate) fn is_set(self) -> bool {
self.0.load(Ordering::SeqCst)
}
pub(crate) fn set(self) {
self.0.store(true, Ordering::SeqCst)
}
#[cfg(test)]
pub(crate) fn global_shared() -> Self {
static SHARED_FLAG: Lazy<SharedFlag> = Lazy::new(SharedFlag::new);
*SHARED_FLAG
}
}
impl Default for SharedFlag {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub(crate) struct DisplayIter<T>(RefCell<Option<T>>);
impl<T> DisplayIter<T> {
pub(crate) fn new(item: T) -> Self {
DisplayIter(RefCell::new(Some(item)))
}
}
impl<I, T> Display for DisplayIter<I>
where
I: IntoIterator<Item = T>,
T: Display,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
if let Some(src) = self.0.borrow_mut().take() {
let mut first = true;
for item in src.into_iter().take(f.width().unwrap_or(usize::MAX)) {
if first {
first = false;
write!(f, "{}", item)?;
} else {
write!(f, ", {}", item)?;
}
}
Ok(())
} else {
write!(f, "DisplayIter:GONE")
}
}
}
#[derive(Debug, Error)]
#[error("could not read '{0}': {error}", .path.display())]
pub struct ReadFileError {
path: PathBuf,
#[source]
error: io::Error,
}
#[derive(Debug, Error)]
#[error("could not write to '{0}': {error}", .path.display())]
pub struct WriteFileError {
path: PathBuf,
#[source]
error: io::Error,
}
pub fn read_file<P: AsRef<Path>>(filename: P) -> Result<Vec<u8>, ReadFileError> {
let path = filename.as_ref();
fs::read(path).map_err(|error| ReadFileError {
path: path.to_owned(),
error,
})
}
pub(crate) fn write_file<P: AsRef<Path>, B: AsRef<[u8]>>(
filename: P,
data: B,
) -> Result<(), WriteFileError> {
let path = filename.as_ref();
fs::write(path, data.as_ref()).map_err(|error| WriteFileError {
path: path.to_owned(),
error,
})
}
pub(crate) fn write_private_file<P: AsRef<Path>, B: AsRef<[u8]>>(
filename: P,
data: B,
) -> Result<(), WriteFileError> {
let path = filename.as_ref();
fs::OpenOptions::new()
.write(true)
.create(true)
.mode(0o600)
.open(path)
.and_then(|mut file| file.write_all(data.as_ref()))
.map_err(|error| WriteFileError {
path: path.to_owned(),
error,
})
}
#[derive(Clone, DataSize, Debug)]
pub struct WithDir<T> {
dir: PathBuf,
value: T,
}
impl<T> WithDir<T> {
pub fn new<P: Into<PathBuf>>(path: P, value: T) -> Self {
WithDir {
dir: path.into(),
value,
}
}
pub fn dir(&self) -> &Path {
self.dir.as_ref()
}
pub(crate) fn into_parts(self) -> (PathBuf, T) {
(self.dir, self.value)
}
pub fn map_ref<U, F: FnOnce(&T) -> U>(&self, f: F) -> WithDir<U> {
WithDir {
dir: self.dir.clone(),
value: f(&self.value),
}
}
pub fn value(&self) -> &T {
&self.value
}
pub fn with_dir(&self, path: PathBuf) -> PathBuf {
if path.is_relative() {
self.dir.join(path)
} else {
path
}
}
}
#[derive(Clone, Debug, Serialize)]
pub(crate) enum Source<I> {
Peer(I),
Client,
Ourself,
}
impl<I> Source<I> {
#[allow(clippy::wrong_self_convention)]
pub(crate) fn from_client(&self) -> bool {
matches!(self, Source::Client)
}
}
impl<I: Clone> Source<I> {
pub(crate) fn node_id(&self) -> Option<I> {
match self {
Source::Peer(node_id) => Some(node_id.clone()),
Source::Client | Source::Ourself => None,
}
}
}
impl<I: Display> Display for Source<I> {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
match self {
Source::Peer(node_id) => Display::fmt(node_id, formatter),
Source::Client => write!(formatter, "client"),
Source::Ourself => write!(formatter, "ourself"),
}
}
}
pub(crate) fn div_round<T>(numerator: T, denominator: T) -> T
where
T: Add<Output = T> + Div<Output = T> + From<u8> + Copy,
{
(numerator + denominator / T::from(2)) / denominator
}
pub(crate) fn register_histogram_metric(
registry: &Registry,
metric_name: &str,
metric_help: &str,
buckets: Vec<f64>,
) -> Result<Histogram, prometheus::Error> {
let histogram_opts = HistogramOpts::new(metric_name, metric_help).buckets(buckets);
let histogram = Histogram::with_opts(histogram_opts)?;
registry.register(Box::new(histogram.clone()))?;
Ok(histogram)
}
#[macro_export]
macro_rules! unregister_metric {
($registry:expr, $metric:expr) => {
$registry
.unregister(Box::new($metric.clone()))
.unwrap_or_else(|_| {
tracing::error!(
"unregistering {} failed: was not registered",
stringify!($metric)
)
});
};
}
#[inline]
pub(crate) fn xor(lhs: &mut [u8], rhs: &[u8]) {
assert_eq!(lhs.len(), rhs.len(), "xor inputs should have equal length");
lhs.iter_mut()
.zip(rhs.iter())
.for_each(|(sb, &cb)| sb.bitxor_assign(cb));
}
pub(crate) async fn wait_for_arc_drop<T>(
arc: Arc<T>,
attempts: usize,
retry_delay: Duration,
) -> bool {
let weak = Arc::downgrade(&arc);
drop(arc);
for _ in 0..attempts {
let strong_count = weak.strong_count();
if strong_count == 0 {
return true;
}
tokio::time::sleep(retry_delay).await;
}
error!(
attempts, ?retry_delay, ty=%any::type_name::<T>(),
"failed to clean up shared reference"
);
false
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use crate::utils::SharedFlag;
use super::{wait_for_arc_drop, xor};
#[test]
fn xor_works() {
let mut lhs = [0x43, 0x53, 0xf2, 0x2f, 0xa9, 0x70, 0xfb, 0xf4];
let rhs = [0x04, 0x0b, 0x5c, 0xa1, 0xef, 0x11, 0x12, 0x23];
let xor_result = [0x47, 0x58, 0xae, 0x8e, 0x46, 0x61, 0xe9, 0xd7];
xor(&mut lhs, &rhs);
assert_eq!(lhs, xor_result);
}
#[test]
#[should_panic(expected = "equal length")]
fn xor_panics_on_uneven_inputs() {
let mut lhs = [0x43, 0x53, 0xf2, 0x2f, 0xa9, 0x70, 0xfb, 0xf4];
let rhs = [0x04, 0x0b, 0x5c, 0xa1, 0xef, 0x11];
xor(&mut lhs, &rhs);
}
#[tokio::test]
async fn arc_drop_waits_for_drop() {
let retry_delay = Duration::from_millis(25);
let attempts = 15;
let arc = Arc::new(());
let arc_in_background = arc.clone();
let _weak_in_background = Arc::downgrade(&arc);
assert!(!wait_for_arc_drop(arc, attempts, retry_delay).await);
let arc = arc_in_background.clone();
let weak = Arc::downgrade(&arc);
drop(arc_in_background);
assert!(wait_for_arc_drop(arc, attempts, retry_delay).await);
assert!(weak.upgrade().is_none());
}
#[test]
fn shared_flag_sanity_check() {
let flag = SharedFlag::new();
let copied = flag;
assert!(!flag.is_set());
assert!(!copied.is_set());
assert!(!flag.is_set());
assert!(!copied.is_set());
flag.set();
assert!(flag.is_set());
assert!(copied.is_set());
assert!(flag.is_set());
assert!(copied.is_set());
}
}