electrs_client 0.2.9

A client for electrs
Documentation
#![allow(clippy::collapsible_if)]

use itertools::Itertools;

use crate::{errors::ClientResult, BlockHeight, ClientError, HasBlockInfo, MoreInfo};

use super::{Client, Update};

// NOTE: one block should keeped always to get meta and etc.
// that's correct in case if you use `fetch_updates_from_reorgs`
pub const REORG_CACHE_SIZE: usize = 31;

impl<T: HasBlockInfo> Client<T> {
    /// for docs check `fetch_updates_from_cache_starting_from`
    /// this is same function but with start_height = 0
    pub async fn fetch_updates_from_cache(&self) -> ClientResult<Vec<Update<T>>> {
        self.fetch_updates_from_cache_starting_from(0).await
    }

    /// Almost all docs from `fetch_updates` aplied to this, but there is some differences:
    /// 1: this function uses cache (which should be processed or error)
    /// 2: this function not returns `Update::RemoveBlock` instead it returns `Update::RemoveCachedBlock` which contains not only height but also block itself which allows to perform more reorg stuff (and allows you to not store block by yourself if you need it)
    /// 3: here you should set `reorg_cache` or error
    ///
    /// So, now to the reason why you should marke blocks as processed.
    /// It's simple: when you restart program or thread dies and cache keeped how do you know which cache are
    /// should be processed and which not.
    ///
    /// Also about start height. Client begin fetching from this point if there is no cached blocks present.
    pub async fn fetch_updates_from_cache_starting_from(
        &self,
        start_height: BlockHeight,
    ) -> ClientResult<Vec<Update<T>>> {
        if self.reorg_cache.is_none() {
            return Err(ClientError::CacheNotInited);
        }

        let cache = self.reorg_cache.as_ref().unwrap();

        let cache_len = cache.len().await;
        if cache_len > REORG_CACHE_SIZE {
            return Err(ClientError::CacheNotProcessed(cache_len));
        }

        let mut metas = Vec::new();

        for i in cache.items().await {
            metas.push(cache.read_cache(i).await?.block);
        }

        // to be able to start from height
        if metas.is_empty() {
            metas = vec![
                self.get_electrs_block_meta(start_height.saturating_sub(1))
                    .await?,
            ];
        };

        // idea here is that remove is completely new batch which prevents loop from
        // remove next add update or in oter word yet anoter time sucking because of async
        let upds_batch = self
            .fetch_updates(&metas)
            .await
            .err_log()?
            .into_iter()
            .fold(Vec::<Vec<Update<T>>>::new(), |mut acc, v| {
                match v {
                    Update::AddBlock { .. } => {
                        if acc
                            .last()
                            .and_then(|v| v.first())
                            .is_some_and(|v| v.is_remove())
                        {
                            acc.push(vec![v]);
                        } else if let Some(acc) = acc.last_mut() {
                            acc.push(v);
                        } else {
                            acc.push(vec![v]);
                        }
                    }
                    Update::RemoveBlock { .. } => acc.push(vec![v]),
                }
                acc
            });

        let mut res = vec![];
        for update in upds_batch.into_iter().flatten() {
            match update {
                Update::AddBlock(update) => {
                    cache.add(update.block.height, &update).await.err_log()?;

                    let items = cache.items().await;
                    if items.len() > REORG_CACHE_SIZE {
                        if cache.read_last_processed().await.err_log()?
                            >= *items.first().unwrap_or(&0)
                        {
                            cache.remove_oldest().await?
                        }
                    }

                    res.push(Update::AddBlock(update));
                }
                Update::RemoveBlock { height, .. } => {
                    let block = cache.read_cache(height).await.err_log()?;
                    cache.remove(height).await.err_log()?;
                    res.push(Update::RemoveBlock {
                        height,
                        block: Some(block),
                    });
                }
            }
        }

        Ok(res)
    }

    /// marks cache block as processed and removes cache file if it exceeds reorg buffer
    /// for the reason why it shold bother you go to `fetch_updates_from_cache_starting_from` docs
    pub async fn mark_as_processed(&self, height: BlockHeight) -> ClientResult<()> {
        if let Some(ref cache) = self.reorg_cache {
            if cache.len().await > REORG_CACHE_SIZE {
                cache.remove(height).await.err_log()?;
            }
            cache.write_last_processed(height).await.err_log()?;
        };

        Ok(())
    }

    /// returns vec of cached blocks which not processed as updates to handle
    pub async fn get_unprocessed_blocks_as_updates(&self) -> ClientResult<Vec<Update<T>>> {
        if let Some(ref cache) = self.reorg_cache {
            let mut res = Vec::new();

            for i in self.get_unprocessed().await? {
                res.push(Update::AddBlock(cache.read_cache(i).await?));
            }

            Ok(res)
        } else {
            Ok(vec![])
        }
    }

    /// returns vec of cached blocks which not processed
    pub async fn get_unprocessed(&self) -> ClientResult<Vec<BlockHeight>> {
        if let Some(ref cache) = self.reorg_cache {
            let lp = cache.read_last_processed().await.err_log()?;
            return Ok(cache
                .items()
                .await
                .into_iter()
                .filter(|v| *v > lp)
                .collect_vec());
        };

        Ok(vec![])
    }

    /// returns vec of cached blocks
    pub async fn get_cached(&self) -> ClientResult<Vec<BlockHeight>> {
        if let Some(ref cache) = self.reorg_cache {
            return Ok(cache.items().await);
        };

        Ok(vec![])
    }

    pub async fn clean_cache(&self) -> ClientResult<()> {
        if let Some(ref cache) = self.reorg_cache {
            cache.clean_all()?;
        }

        Ok(())
    }
}