electrs_client 0.2.9

A client for electrs
Documentation
use cache::Cache;
use itertools::Itertools;
use reqwest::StatusCode;
use std::{collections::VecDeque, io::Read, ops::RangeInclusive};

mod cache;
mod config;
mod errors;
mod has_block_meta;
mod updates;
mod utils;

pub use config::*;
pub use errors::*;
pub use has_block_meta::*;
pub use updates::*;
pub use utils::MoreInfo;

pub use crate::cache::UpdateCapable;

#[derive(Copy, Clone, serde::Deserialize, serde::Serialize, Debug)]
#[serde(rename_all = "snake_case")]
pub enum InscriptionCacheType {
    Content,
    History,
    Market,
    Previewer,
}

impl std::fmt::Display for InscriptionCacheType {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let serialized = serde_json::to_string(self)
            .expect("Failed to serialize InscriptionCacheType")
            .replace('"', "");

        write!(f, "{serialized}")
    }
}

pub type BlockHeight = u32;

pub struct Client<T: HasBlockInfo> {
    client: reqwest::Client,
    pub config: Config,
    reorg_cache: Option<Cache<T>>,
}

impl<T: HasBlockInfo> Client<T> {
    /// creates self from config use this if you want to init config by yourself
    /// otherwise you can use `new` or `new_with_prefix`
    pub async fn new_from_cfg(cfg: Config) -> ClientResult<Self> {
        let client = reqwest::Client::new();
        Ok(Self {
            client,
            reorg_cache: Self::init_cache(&cfg).await.transpose()?,
            config: cfg,
        })
    }

    /// creates self from env with prefix "EC_"
    pub async fn new() -> ClientResult<Self> {
        Self::new_with_prefix("EC_").await
    }

    /// creates self from config with any prefix you want
    pub async fn new_with_prefix(prefix: &str) -> ClientResult<Self> {
        let config = envy::prefixed(prefix).from_env::<Config>()?;
        Self::new_from_cfg(config).await
    }

    /// checks that server is alive and ready to work (added to simple health check)
    pub async fn is_alive(&self) -> bool {
        self.get_last_electrs_block_meta().await.is_ok()
    }

    async fn init_cache(cfg: &Config) -> Option<Result<Cache<T>, cache::CacheError>> {
        if let Some(ref path) = cfg.reorgs_path {
            Some(Cache::<T>::new(path.to_string()).await)
        } else {
            None
        }
    }

    /// it's low level api and by using it you lose some checks done by `fetch_updates`
    pub async fn fetch_range(
        &self,
        range: RangeInclusive<BlockHeight>,
    ) -> ClientResult<Vec<UpdateCapable<T>>> {
        let source_url = &self.config.url;
        let from = range.start();
        let to = range.end();
        let mut url = format!(
            "{source_url}/priv/cache?cache_type={}&from={from}&to={to}",
            self.config.source_type
        );
        if let Some(limit) = self.config.limit {
            url.push_str(&format!("&limit={limit}"));
        }

        let mut response = self
            .client
            .execute(
                self.client
                    .post(url)
                    .header("Accept-Encoding", "br")
                    .header("Content-Encoding", "br")
                    .header("Cache-Control", "no-transform")
                    .basic_auth(&self.config.user, Some(&self.config.password))
                    .build()?,
            )
            .await?;

        if !response.status().is_success() {
            let status = response.status();
            let error_message = response.text().await.map_err(|e| {
                ClientError::RequestUnsucces(
                    status,
                    format!("failed to parse body of the request: {e}"),
                )
            })?;

            let err = match status {
                StatusCode::UNAUTHORIZED => ClientError::RequestUnauthorized(error_message),
                code => ClientError::RequestUnsucces(code, error_message),
            };
            return Err(err);
        }

        let mut items = Vec::new();

        let mut buf = VecDeque::new();
        let mut decompress_buffer = vec![];

        let mut len: Option<u64> = None;

        while let Some(chunk) = response.chunk().await? {
            buf.extend(chunk);

            if len.is_none() {
                if buf.len() < 8 {
                    continue;
                }

                let bytes: [u8; 8] = buf.drain(..8).collect_array().unwrap();
                len = Some(u64::from_be_bytes(bytes));
            }

            let chunk = if buf.len() >= len.unwrap() as usize {
                let res = buf.drain(..len.take().unwrap() as usize).collect_vec();
                res
            } else {
                continue;
            };

            let mut cursor = std::io::Cursor::new(chunk);
            let mut reader = brotli::Decompressor::new(&mut cursor, 4096);
            reader
                .read_to_end(&mut decompress_buffer)
                .map_err(|e| ClientError::BrotliDecompress(BrotliDecompress::Io(e)))?;
            let data: T =
                postcard::from_bytes(&decompress_buffer).map_err(ClientError::Postcard)?;
            decompress_buffer.clear();

            let block = data.block_info();
            items.push(UpdateCapable { block, data });
        }

        Ok(items)
    }
}