Skip to main content

ic_bn_lib/
lib.rs

1// Needed for certain macros
2#![recursion_limit = "256"]
3#![warn(clippy::nursery)]
4#![warn(tail_expr_drop_order)]
5#![allow(clippy::cognitive_complexity)]
6#![allow(clippy::field_reassign_with_default)]
7
8#[cfg(feature = "custom-domains")]
9pub mod custom_domains;
10pub mod http;
11pub mod pubsub;
12pub mod tasks;
13pub mod tests;
14pub mod tls;
15pub mod utils;
16#[cfg(feature = "vector")]
17pub mod vector;
18
19use std::{fs::File, path::Path};
20
21use anyhow::{Context, anyhow};
22use bytes::Bytes;
23use futures::StreamExt;
24use ic_bn_lib_common::Error;
25use tokio::io::AsyncWriteExt;
26
27pub use hyper;
28pub use hyper_util;
29pub use ic_agent;
30pub use ic_bn_lib_common;
31pub use prometheus;
32pub use reqwest;
33pub use rustls;
34#[cfg(feature = "acme-alpn")]
35pub use rustls_acme;
36pub use uuid;
37
38/// Error to be used with `retry_async` macro
39/// which indicates whether it should be retried or not.
40#[derive(thiserror::Error, Debug)]
41pub enum RetryError {
42    #[error("Permanent error: {0:?}")]
43    Permanent(anyhow::Error),
44    #[error("Transient error: {0:?}")]
45    Transient(anyhow::Error),
46}
47
48/// Downloads the given url to given path.
49/// Destination folder must exist.
50pub fn download_url_to(url: &str, path: &Path) -> Result<u64, Error> {
51    let mut r = reqwest::blocking::get(url).context("unable to perform HTTP request")?;
52    if !r.status().is_success() {
53        return Err(anyhow!("incorrect HTTP code: {}", r.status()).into());
54    }
55
56    let mut file = File::create(path).context("could not create file")?;
57    Ok(r.copy_to(&mut file)
58        .context("unable to write body to file")?)
59}
60
61/// Downloads the given url and returns it as Bytes
62pub fn download_url(url: &str) -> Result<Bytes, Error> {
63    let r = reqwest::blocking::get(url).context("unable to perform HTTP request")?;
64    if !r.status().is_success() {
65        return Err(anyhow!("incorrect HTTP code: {}", r.status()).into());
66    }
67
68    Ok(r.bytes().context("unable to fetch file")?)
69}
70
71/// Downloads the given url to given path.
72/// Destination folder must exist.
73pub async fn download_url_to_async(url: &str, path: &Path) -> Result<(), Error> {
74    let r = reqwest::get(url)
75        .await
76        .context("unable to perform HTTP request")?;
77    if !r.status().is_success() {
78        return Err(anyhow!("incorrect HTTP code: {}", r.status()).into());
79    }
80
81    let mut file = tokio::fs::File::create(path)
82        .await
83        .context("could not create file")?;
84
85    let mut stream = r.bytes_stream();
86    while let Some(v) = stream.next().await {
87        file.write(&v.context("unable to read chunk")?)
88            .await
89            .context("unable to write chunk")?;
90    }
91
92    Ok(())
93}
94
95/// Downloads the given url and returns it as Bytes
96pub async fn download_url_async(url: &str) -> Result<Bytes, Error> {
97    let r = reqwest::get(url)
98        .await
99        .context("unable to perform HTTP request")?;
100
101    if !r.status().is_success() {
102        return Err(anyhow!("incorrect HTTP code: {}", r.status()).into());
103    }
104
105    Ok(r.bytes().await.context("unable to fetch file")?)
106}
107
108/// Retrying async closures/functions holding mutable references is a pain in Rust.
109/// So, for now, we'll have to use a macro to work that around.
110#[macro_export]
111macro_rules! retry_async {
112    ($f:expr, $timeout:expr, $delay:expr) => {{
113        use rand::{Rng, SeedableRng};
114        // SmallRng is Send which we require
115        let mut rng = rand::rngs::SmallRng::from_entropy();
116
117        let start = std::time::Instant::now();
118        let mut delay = $delay;
119
120        let result = loop {
121            // Run the function wrapping it into Tokio timeout future so
122            // its execution time doesn't exceed our configured limit
123            let Ok(res) = tokio::time::timeout($timeout, $f).await else {
124                break Err(anyhow::anyhow!("Timed out"));
125            };
126
127            let err = match res {
128                Ok(v) => break Ok(v),
129                Err($crate::RetryError::Permanent(e)) => break Err(e),
130                Err($crate::RetryError::Transient(e)) => e,
131            };
132
133            let left = $timeout.saturating_sub(start.elapsed());
134            if left == std::time::Duration::ZERO {
135                break Err(err);
136            }
137
138            delay = left.min(delay * 2);
139            // Generate a random jitter in 0.0..0.1 range
140            let jitter: f64 = (rng.r#gen::<f64>() / 10.0);
141            let d64 = delay.as_secs_f64();
142            delay = Duration::from_secs_f64(d64.mul_add(0.95, d64 * jitter));
143            tokio::time::sleep(delay).await;
144        };
145
146        result
147    }};
148
149    ($f:expr, $timeout:expr) => {
150        retry_async!($f, $timeout, Duration::from_millis(500))
151    };
152
153    ($f:expr) => {
154        retry_async!($f, Duration::from_secs(60), Duration::from_millis(500))
155    };
156}
157
158#[macro_export]
159macro_rules! dyn_event {
160    ($lvl:ident, $($arg:tt)+) => {
161        match $lvl {
162            ::tracing::Level::TRACE => ::tracing::trace!($($arg)+),
163            ::tracing::Level::DEBUG => ::tracing::debug!($($arg)+),
164            ::tracing::Level::INFO => ::tracing::info!($($arg)+),
165            ::tracing::Level::WARN => ::tracing::warn!($($arg)+),
166            ::tracing::Level::ERROR => ::tracing::error!($($arg)+),
167        }
168    };
169}