pub mod backoff_util;
pub mod broadcaststream;
pub mod update_merge;
use std::convert::Infallible;
use std::fmt::{Debug, Display, Formatter};
use std::future::Future;
use std::hash::Hash;
use std::io::Write;
use std::path::Path;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::LazyLock;
use std::{fs, io};
use anyhow::format_err;
use fedimint_logging::LOG_CORE;
pub use fedimint_util_error::*;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tracing::{Instrument, Span, debug, warn};
use url::{Host, ParseError, Url};
use crate::envs::{FM_DEBUG_SHOW_SECRETS_ENV, is_env_var_set};
use crate::task::MaybeSend;
use crate::{apply, async_trait_maybe_send, maybe_add_send, runtime};
pub type BoxFuture<'a, T> = Pin<Box<maybe_add_send!(dyn Future<Output = T> + 'a)>>;
pub type BoxStream<'a, T> = Pin<Box<maybe_add_send!(dyn futures::Stream<Item = T> + 'a)>>;
#[apply(async_trait_maybe_send!)]
pub trait NextOrPending {
type Output;
async fn next_or_pending(&mut self) -> Self::Output;
async fn ok(&mut self) -> anyhow::Result<Self::Output>;
}
#[apply(async_trait_maybe_send!)]
impl<S> NextOrPending for S
where
S: futures::Stream + Unpin + MaybeSend,
S::Item: MaybeSend,
{
type Output = S::Item;
async fn ok(&mut self) -> anyhow::Result<Self::Output> {
self.next()
.await
.map_or_else(|| Err(format_err!("Stream was unexpectedly closed")), Ok)
}
async fn next_or_pending(&mut self) -> Self::Output {
if let Some(item) = self.next().await {
item
} else {
debug!(target: LOG_CORE, "Stream ended in next_or_pending, pending forever to avoid throwing an error on shutdown");
std::future::pending().await
}
}
}
#[derive(Hash, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct SafeUrl(Url);
impl SafeUrl {
pub fn parse(url_str: &str) -> Result<Self, ParseError> {
Url::parse(url_str).map(SafeUrl)
}
pub fn to_unsafe(self) -> Url {
self.0
}
#[allow(clippy::result_unit_err)] pub fn set_username(&mut self, username: &str) -> Result<(), ()> {
self.0.set_username(username)
}
#[allow(clippy::result_unit_err)] pub fn set_password(&mut self, password: Option<&str>) -> Result<(), ()> {
self.0.set_password(password)
}
#[allow(clippy::result_unit_err)] pub fn without_auth(&self) -> Result<Self, ()> {
let mut url = self.clone();
url.set_username("").and_then(|()| url.set_password(None))?;
Ok(url)
}
pub fn host(&self) -> Option<Host<&str>> {
self.0.host()
}
pub fn host_str(&self) -> Option<&str> {
self.0.host_str()
}
pub fn scheme(&self) -> &str {
self.0.scheme()
}
pub fn port(&self) -> Option<u16> {
self.0.port()
}
pub fn port_or_known_default(&self) -> Option<u16> {
self.0.port_or_known_default()
}
pub fn path(&self) -> &str {
self.0.path()
}
pub fn as_str(&self) -> &str {
self.0.as_str()
}
pub fn username(&self) -> &str {
self.0.username()
}
pub fn password(&self) -> Option<&str> {
self.0.password()
}
pub fn join(&self, input: &str) -> Result<Self, ParseError> {
self.0.join(input).map(SafeUrl)
}
#[allow(clippy::case_sensitive_file_extension_comparisons)]
pub fn is_onion_address(&self) -> bool {
let host = self.host_str().unwrap_or_default();
host.ends_with(".onion")
}
pub fn fragment(&self) -> Option<&str> {
self.0.fragment()
}
pub fn set_fragment(&mut self, arg: Option<&str>) {
self.0.set_fragment(arg);
}
}
static SHOW_SECRETS: LazyLock<bool> = LazyLock::new(|| {
let enable = is_env_var_set(FM_DEBUG_SHOW_SECRETS_ENV);
if enable {
warn!(target: LOG_CORE, "{} enabled. Please don't use in production.", FM_DEBUG_SHOW_SECRETS_ENV);
}
enable
});
impl Display for SafeUrl {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}://", self.0.scheme())?;
if !self.0.username().is_empty() {
let show_secrets = *SHOW_SECRETS;
if show_secrets {
write!(f, "{}", self.0.username())?;
} else {
write!(f, "REDACTEDUSER")?;
}
if self.0.password().is_some() {
if show_secrets {
write!(
f,
":{}",
self.0.password().expect("Just checked it's checked")
)?;
} else {
write!(f, ":REDACTEDPASS")?;
}
}
write!(f, "@")?;
}
if let Some(host) = self.0.host_str() {
write!(f, "{host}")?;
}
if let Some(port) = self.0.port() {
write!(f, ":{port}")?;
}
write!(f, "{}", self.0.path())?;
Ok(())
}
}
impl Debug for SafeUrl {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "SafeUrl(")?;
Display::fmt(self, f)?;
write!(f, ")")?;
Ok(())
}
}
impl From<Url> for SafeUrl {
fn from(u: Url) -> Self {
Self(u)
}
}
impl FromStr for SafeUrl {
type Err = ParseError;
#[inline]
fn from_str(input: &str) -> Result<Self, ParseError> {
Self::parse(input)
}
}
#[cfg(not(target_family = "wasm"))]
pub fn write_new<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
let mut file = fs::File::options()
.write(true)
.create_new(true)
.open(path)?;
file.write_all(contents.as_ref())?;
file.sync_all()?;
Ok(())
}
#[cfg(not(target_family = "wasm"))]
pub fn write_overwrite<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
fs::File::options()
.write(true)
.create(true)
.truncate(true)
.open(path)?
.write_all(contents.as_ref())
}
#[cfg(not(target_family = "wasm"))]
pub async fn write_overwrite_async<P: AsRef<Path>, C: AsRef<[u8]>>(
path: P,
contents: C,
) -> io::Result<()> {
tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?
.write_all(contents.as_ref())
.await
}
#[cfg(not(target_family = "wasm"))]
pub async fn write_new_async<P: AsRef<Path>, C: AsRef<[u8]>>(
path: P,
contents: C,
) -> io::Result<()> {
tokio::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(path)
.await?
.write_all(contents.as_ref())
.await
}
#[derive(Debug, Clone)]
pub struct Spanned<T> {
value: T,
span: Span,
}
impl<T> Spanned<T> {
pub async fn new<F: Future<Output = T>>(span: Span, make: F) -> Self {
Self::try_new::<Infallible, _>(span, async { Ok(make.await) })
.await
.unwrap()
}
pub async fn try_new<E, F: Future<Output = Result<T, E>>>(
span: Span,
make: F,
) -> Result<Self, E> {
let span2 = span.clone();
async {
Ok(Self {
value: make.await?,
span: span2,
})
}
.instrument(span)
.await
}
pub fn borrow(&self) -> Spanned<&T> {
Spanned {
value: &self.value,
span: self.span.clone(),
}
}
pub fn map<U>(self, map: impl Fn(T) -> U) -> Spanned<U> {
Spanned {
value: map(self.value),
span: self.span,
}
}
pub fn borrow_mut(&mut self) -> Spanned<&mut T> {
Spanned {
value: &mut self.value,
span: self.span.clone(),
}
}
pub fn with_sync<O, F: FnOnce(T) -> O>(self, f: F) -> O {
let _g = self.span.enter();
f(self.value)
}
pub async fn with<Fut: Future, F: FnOnce(T) -> Fut>(self, f: F) -> Fut::Output {
async { f(self.value).await }.instrument(self.span).await
}
pub fn span(&self) -> Span {
self.span.clone()
}
pub fn value(&self) -> &T {
&self.value
}
pub fn value_mut(&mut self) -> &mut T {
&mut self.value
}
pub fn into_value(self) -> T {
self.value
}
}
pub fn handle_version_hash_command(version_hash: &str) {
let mut args = std::env::args();
if let Some(ref arg) = args.nth(1)
&& arg.as_str() == "version-hash"
{
println!("{version_hash}");
std::process::exit(0);
}
}
pub async fn retry<F, Fut, T>(
op_name: impl Into<String>,
strategy: impl backoff_util::Backoff,
op_fn: F,
) -> Result<T, anyhow::Error>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, anyhow::Error>>,
{
let mut strategy = strategy;
let op_name = op_name.into();
let mut attempts: u64 = 0;
loop {
attempts += 1;
match op_fn().await {
Ok(result) => return Ok(result),
Err(err) => {
if let Some(interval) = strategy.next() {
debug!(
target: LOG_CORE,
err = %err.fmt_compact_anyhow(),
%attempts,
interval = interval.as_secs(),
"{} failed, retrying",
op_name,
);
runtime::sleep(interval).await;
} else {
warn!(
target: LOG_CORE,
err = %err.fmt_compact_anyhow(),
%attempts,
"{} failed",
op_name,
);
return Err(err);
}
}
}
}
}
pub fn get_median(vals: &[u64]) -> Option<u64> {
if vals.is_empty() {
return None;
}
let len = vals.len();
let mid = len / 2;
if len.is_multiple_of(2) {
Some(u64::midpoint(vals[mid - 1], vals[mid]))
} else {
Some(vals[mid])
}
}
pub fn get_average(vals: &[u64]) -> Option<u64> {
if vals.is_empty() {
return None;
}
let sum: u64 = vals.iter().sum();
Some(sum / vals.len() as u64)
}
#[cfg(test)]
mod tests;