Skip to main content

ic_bn_lib/
lib.rs

1// Needed for certain macros
2#![recursion_limit = "256"]
3#![warn(clippy::nursery)]
4#![warn(tail_expr_drop_order)]
5#![allow(clippy::cognitive_complexity)]
6#![allow(clippy::field_reassign_with_default)]
7#![allow(clippy::collapsible_if)]
8
9#[cfg(feature = "custom-domains")]
10pub mod custom_domains;
11pub mod http;
12pub mod network;
13pub mod pubsub;
14#[cfg(feature = "smtp")]
15pub mod smtp;
16pub mod tasks;
17pub mod tests;
18pub mod tls;
19pub mod utils;
20#[cfg(feature = "vector")]
21pub mod vector;
22
23use std::{fs::File, path::Path};
24
25use anyhow::{Context, anyhow};
26use bytes::Bytes;
27use futures::StreamExt;
28use ic_bn_lib_common::Error;
29use tokio::io::AsyncWriteExt;
30
31pub use hickory_proto;
32pub use hickory_resolver;
33pub use hyper;
34pub use hyper_util;
35pub use ic_agent;
36pub use ic_bn_lib_common;
37#[cfg(feature = "smtp")]
38pub use mail_auth;
39pub use prometheus;
40pub use reqwest;
41pub use rustls;
42#[cfg(feature = "acme-alpn")]
43pub use rustls_acme;
44pub use uuid;
45
46/// Converts a string representation to an `EmailAddress`. Panics when an error occurs.
47#[macro_export]
48macro_rules! email {
49    ($email:expr) => {{ $crate::smtp::address::EmailAddress::from_text($email).unwrap() }};
50}
51
52/// Error to be used with `retry_async` macro
53/// which indicates whether it should be retried or not.
54#[derive(thiserror::Error, Debug)]
55pub enum RetryError {
56    #[error("Permanent error: {0:?}")]
57    Permanent(anyhow::Error),
58    #[error("Transient error: {0:?}")]
59    Transient(anyhow::Error),
60}
61
62/// Downloads the given url to given path.
63/// Destination folder must exist.
64pub fn download_url_to(url: &str, path: &Path) -> Result<u64, Error> {
65    let mut r = reqwest::blocking::get(url).context("unable to perform HTTP request")?;
66    if !r.status().is_success() {
67        return Err(anyhow!("incorrect HTTP code: {}", r.status()).into());
68    }
69
70    let mut file = File::create(path).context("could not create file")?;
71    Ok(r.copy_to(&mut file)
72        .context("unable to write body to file")?)
73}
74
75/// Downloads the given url and returns it as Bytes
76pub fn download_url(url: &str) -> Result<Bytes, Error> {
77    let r = reqwest::blocking::get(url).context("unable to perform HTTP request")?;
78    if !r.status().is_success() {
79        return Err(anyhow!("incorrect HTTP code: {}", r.status()).into());
80    }
81
82    Ok(r.bytes().context("unable to fetch file")?)
83}
84
85/// Downloads the given url to given path.
86/// Destination folder must exist.
87pub async fn download_url_to_async(url: &str, path: &Path) -> Result<(), Error> {
88    let r = reqwest::get(url)
89        .await
90        .context("unable to perform HTTP request")?;
91    if !r.status().is_success() {
92        return Err(anyhow!("incorrect HTTP code: {}", r.status()).into());
93    }
94
95    let mut file = tokio::fs::File::create(path)
96        .await
97        .context("could not create file")?;
98
99    let mut stream = r.bytes_stream();
100    while let Some(v) = stream.next().await {
101        file.write(&v.context("unable to read chunk")?)
102            .await
103            .context("unable to write chunk")?;
104    }
105
106    Ok(())
107}
108
109/// Downloads the given url and returns it as Bytes
110pub async fn download_url_async(url: &str) -> Result<Bytes, Error> {
111    let r = reqwest::get(url)
112        .await
113        .context("unable to perform HTTP request")?;
114
115    if !r.status().is_success() {
116        return Err(anyhow!("incorrect HTTP code: {}", r.status()).into());
117    }
118
119    Ok(r.bytes().await.context("unable to fetch file")?)
120}
121
122/// Retrying async closures/functions holding mutable references is a pain in Rust.
123/// So, for now, we'll have to use a macro to work that around.
124#[macro_export]
125macro_rules! retry_async {
126    ($f:expr, $timeout:expr, $delay:expr) => {{
127        use rand::{Rng, SeedableRng};
128        // SmallRng is Send which we require
129        let mut rng = rand::rngs::SmallRng::from_entropy();
130
131        let start = std::time::Instant::now();
132        let mut delay = $delay;
133
134        let result = loop {
135            // Run the function wrapping it into Tokio timeout future so
136            // its execution time doesn't exceed our configured limit
137            let Ok(res) = tokio::time::timeout($timeout, $f).await else {
138                break Err(anyhow::anyhow!("Timed out"));
139            };
140
141            let err = match res {
142                Ok(v) => break Ok(v),
143                Err($crate::RetryError::Permanent(e)) => break Err(e),
144                Err($crate::RetryError::Transient(e)) => e,
145            };
146
147            let left = $timeout.saturating_sub(start.elapsed());
148            if left == std::time::Duration::ZERO {
149                break Err(err);
150            }
151
152            delay = left.min(delay * 2);
153            // Generate a random jitter in 0.0..0.1 range
154            let jitter: f64 = (rng.r#gen::<f64>() / 10.0);
155            let d64 = delay.as_secs_f64();
156            delay = Duration::from_secs_f64(d64.mul_add(0.95, d64 * jitter));
157            tokio::time::sleep(delay).await;
158        };
159
160        result
161    }};
162
163    ($f:expr, $timeout:expr) => {
164        retry_async!($f, $timeout, Duration::from_millis(500))
165    };
166
167    ($f:expr) => {
168        retry_async!($f, Duration::from_secs(60), Duration::from_millis(500))
169    };
170}
171
172#[macro_export]
173macro_rules! dyn_event {
174    ($lvl:ident, $($arg:tt)+) => {
175        match $lvl {
176            ::tracing::Level::TRACE => ::tracing::trace!($($arg)+),
177            ::tracing::Level::DEBUG => ::tracing::debug!($($arg)+),
178            ::tracing::Level::INFO => ::tracing::info!($($arg)+),
179            ::tracing::Level::WARN => ::tracing::warn!($($arg)+),
180            ::tracing::Level::ERROR => ::tracing::error!($($arg)+),
181        }
182    };
183}