electrs_client 0.2.9

A client for electrs
Documentation
use serde::{Deserialize, Serialize};
use std::{
    fs,
    io::{Read, Write},
    path::{Path, PathBuf},
    sync::Arc,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use crate::BlockHeight;

#[derive(Debug, thiserror::Error)]
pub enum CacheError {
    #[error("Io: {0}")]
    Io(#[from] std::io::Error),
    #[error("Postcard: {0}")]
    Postcard(#[from] postcard::Error),
    #[error("Tokio join: {0}")]
    Join(#[from] tokio::task::JoinError),
    #[error("Unknown file format: {0}")]
    UnknownFile(String),
    #[error("Wrong last processed height")]
    WrongLastProcessedHeight,
}

pub type CacheResult<T> = Result<T, CacheError>;

#[derive(Clone)]
pub struct Cache<T: serde::Serialize> {
    path: String,
    storage: Arc<tokio::sync::Mutex<Vec<BlockHeight>>>,
    __marker: std::marker::PhantomData<T>,
}

/// struct which allows you to fetch updates
/// this data should be extracted from your blocks
#[derive(Serialize, Deserialize, Clone)]
#[serde(bound = "T: serde::de::DeserializeOwned")]
pub struct UpdateCapable<T: serde::Serialize + serde::de::DeserializeOwned> {
    pub block: crate::BlockMeta,
    pub data: T,
}

const EXTENSION: &str = "blob";
const LAST_PROCESSED_BLOCK: &str = "last_processed_block";

impl<T: serde::Serialize + serde::de::DeserializeOwned> Cache<T> {
    pub async fn new(path: String) -> CacheResult<Self> {
        tokio::fs::create_dir_all(&path).await.ok();
        let mut entries = tokio::fs::read_dir(&path).await?;
        let mut files: Vec<BlockHeight> = vec![];

        while let Ok(Some(entry)) = entries.next_entry().await {
            let path = entry.path();
            if path.is_file() {
                if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
                    if name == LAST_PROCESSED_BLOCK {
                        continue;
                    }
                    let name = name
                        .split('.')
                        .next()
                        .ok_or(CacheError::UnknownFile(name.to_string()))?;
                    files.push(
                        name.parse()
                            .map_err(|_| CacheError::UnknownFile(name.to_string()))?,
                    );
                }
            }
        }

        files.sort_unstable_by_key(|x| *x);

        Ok(Self {
            path: path.to_string(),
            storage: Arc::new(tokio::sync::Mutex::new(files)),
            __marker: std::marker::PhantomData,
        })
    }

    pub async fn len(&self) -> usize {
        self.storage.lock().await.len()
    }

    pub async fn items(&self) -> Vec<BlockHeight> {
        self.storage.lock().await.clone()
    }

    pub async fn read_cache(&self, block_id: BlockHeight) -> CacheResult<UpdateCapable<T>> {
        let file = format!("{block_id}.{EXTENSION}");

        let path = std::path::Path::new(&self.path).join(file);
        let mut buffer = vec![];
        tokio::fs::OpenOptions::new()
            .read(true)
            .open(path)
            .await?
            .read_to_end(&mut buffer)
            .await?;

        let data = Self::decompress_brotli(buffer)?;

        Ok(postcard::from_bytes(&data)?)
    }

    pub async fn add(&self, block_id: BlockHeight, data: &UpdateCapable<T>) -> CacheResult<()> {
        let path = std::path::Path::new(&self.path).join(format!("{block_id}.{EXTENSION}"));
        let mut file = tokio::fs::OpenOptions::new()
            .create(true)
            .write(true)
            .truncate(true)
            .open(path)
            .await?;
        let data = postcard::to_allocvec(data)?;

        let buffer = tokio::task::spawn_blocking(|| Self::compress_brotli(data)).await??;

        file.write_all(&buffer).await?;
        file.flush().await?;

        {
            let mut l = self.storage.lock().await;

            if let Some(last) = l.last() {
                if block_id - 1 != *last {
                    panic!("Failed to push same block in the cache");
                }
            }

            l.push(block_id);
        }

        Ok(())
    }

    pub async fn read_last_processed(&self) -> CacheResult<BlockHeight> {
        let path = std::path::Path::new(&self.path).join(LAST_PROCESSED_BLOCK);
        if !path.exists() {
            return Ok(0);
        }
        let bytes = tokio::fs::read(&path)
            .await
            .inspect_err(|_| println!("write path: {path:?}"))?;
        let height = String::from_utf8(bytes).map_err(|_| CacheError::WrongLastProcessedHeight)?;
        height
            .parse()
            .map_err(|_| CacheError::WrongLastProcessedHeight)
    }

    pub async fn write_last_processed(&self, height: BlockHeight) -> CacheResult<()> {
        let path = std::path::Path::new(&self.path).join(LAST_PROCESSED_BLOCK);
        tokio::fs::write(&path, height.to_string())
            .await
            .inspect_err(|_| println!("write path: {path:?}"))?;
        Ok(())
    }

    pub async fn remove(&self, block_id: BlockHeight) -> CacheResult<()> {
        let mut lock = self.storage.lock().await;
        let pos = lock.binary_search(&block_id);
        if let Ok(pos) = pos {
            lock.remove(pos);
            drop(lock);

            let path = std::path::Path::new(&self.path).join(format!("{block_id}.{EXTENSION}"));
            tokio::fs::remove_file(&path)
                .await
                .inspect_err(|_| println!("rm path: {path:?}"))?;
        }

        Ok(())
    }

    pub async fn remove_oldest(&self) -> CacheResult<()> {
        let mut lock = self.storage.lock().await;
        if !lock.is_empty() {
            let block_id = lock.remove(0);
            drop(lock);

            let path = std::path::Path::new(&self.path).join(format!("{block_id}.{EXTENSION}"));
            tokio::fs::remove_file(&path)
                .await
                .inspect_err(|_| println!("rm oldest path: {path:?}"))?;
        }
        Ok(())
    }

    pub fn compress_brotli(data: Vec<u8>) -> CacheResult<Vec<u8>> {
        let mut buffer = Vec::new();
        brotli::CompressorWriter::new(&mut buffer, 4096, 4, 22).write_all(&data)?;
        Ok(buffer)
    }

    pub fn decompress_brotli(buffer: Vec<u8>) -> CacheResult<Vec<u8>> {
        let mut decompressed_data = Vec::new();
        brotli::Decompressor::new(&mut std::io::Cursor::new(buffer), 4096)
            .read_to_end(&mut decompressed_data)?;
        Ok(decompressed_data)
    }

    pub fn clean_all(&self) -> CacheResult<()> {
        clear_dir(PathBuf::from(self.path.clone()))?;

        Ok(())
    }
}

fn clear_dir<P: AsRef<Path>>(path: P) -> std::io::Result<()> {
    for entry in fs::read_dir(&path)? {
        let entry = entry?;
        let entry_path = entry.path();
        if entry_path.is_dir() {
            fs::remove_dir_all(entry_path)?;
        } else {
            fs::remove_file(entry_path)?;
        }
    }
    Ok(())
}