ckb-tx-pool 1.2.2

CKB Tx-pool stores transactions, which is designed for CKB Two Step Transaction Confirmation mechanism
Documentation
use crate::error::Reject;
use ckb_db::DBWithTTL;
use ckb_error::{AnyError, OtherError};
use ckb_types::{packed::Byte32, prelude::*};
use rand::distributions::Uniform;
use rand::{Rng, thread_rng};
use std::path::Path;

const DEFAULT_SHARDS: u32 = 5;

#[derive(Debug)]
pub struct RecentReject {
    ttl: i32,
    shard_num: u32,
    pub(crate) count_limit: u64,
    pub(crate) total_keys_num: u64,
    pub(crate) db: DBWithTTL,
}

impl RecentReject {
    pub fn new<P>(path: P, count_limit: u64, ttl: i32) -> Result<RecentReject, AnyError>
    where
        P: AsRef<Path>,
    {
        Self::build(path, DEFAULT_SHARDS, count_limit, ttl)
    }

    pub(crate) fn build<P>(
        path: P,
        shard_num: u32,
        count_limit: u64,
        ttl: i32,
    ) -> Result<RecentReject, AnyError>
    where
        P: AsRef<Path>,
    {
        let cf_names: Vec<_> = (0..shard_num).map(|c| c.to_string()).collect();
        let db = DBWithTTL::open_cf(path, cf_names.clone(), ttl)?;
        let estimate_keys_num = cf_names
            .iter()
            .map(|cf| db.estimate_num_keys_cf(cf))
            .collect::<Result<Vec<_>, _>>()?;

        let total_keys_num = Self::checked_estimate_sum(&estimate_keys_num)?;

        Ok(RecentReject {
            shard_num,
            count_limit,
            ttl,
            db,
            total_keys_num,
        })
    }

    pub fn put(&mut self, hash: &Byte32, reject: Reject) -> Result<(), AnyError> {
        let hash_slice = hash.as_slice();
        let shard = self.get_shard(hash_slice).to_string();
        let reject: ckb_jsonrpc_types::PoolTransactionReject = reject.into();
        let json_string = serde_json::to_string(&reject)?;
        self.db.put(&shard, hash_slice, json_string)?;

        if let Some(total_keys_num) = self.total_keys_num.checked_add(1) {
            if total_keys_num > self.count_limit {
                self.shrink()?;
            }
        } else {
            // overflow occurred, try shrink
            self.shrink()?;
        }
        Ok(())
    }

    pub fn get(&self, hash: &Byte32) -> Result<Option<String>, AnyError> {
        let slice = hash.as_slice();
        let shard = self.get_shard(slice).to_string();
        let ret = self.db.get_pinned(&shard, slice)?;
        Ok(ret.map(|bytes| unsafe { String::from_utf8_unchecked(bytes.to_vec()) }))
    }

    pub fn get_estimate_total_keys_num(&self) -> u64 {
        self.total_keys_num
    }

    fn estimate_total_keys_num(&self) -> Result<u64, AnyError> {
        let estimate_keys_num = (0..self.shard_num)
            .map(|num| self.db.estimate_num_keys_cf(&num.to_string()))
            .collect::<Result<Vec<_>, _>>()?;

        Self::checked_estimate_sum(&estimate_keys_num).map_err(Into::into)
    }

    fn checked_estimate_sum(estimate_keys_num: &[Option<u64>]) -> Result<u64, OtherError> {
        estimate_keys_num.iter().try_fold(0u64, |total, num| {
            let keys_num = num.unwrap_or(0);
            total.checked_add(keys_num).ok_or_else(|| {
                OtherError::new(format!(
                    "recent reject estimated keys count overflows: {} + {}",
                    total, keys_num
                ))
            })
        })
    }

    fn shrink(&mut self) -> Result<u64, AnyError> {
        let mut rng = thread_rng();
        let shard = rng.sample(Uniform::new(0, self.shard_num)).to_string();
        self.db.drop_cf(&shard)?;
        self.db.create_cf_with_ttl(&shard, self.ttl)?;

        let total_keys_num = self.estimate_total_keys_num()?;
        self.total_keys_num = total_keys_num;
        Ok(total_keys_num)
    }

    fn get_shard(&self, hash: &[u8]) -> u32 {
        let mut low_u32 = [0u8; 4];
        low_u32.copy_from_slice(&hash[0..4]);
        u32::from_le_bytes(low_u32) % self.shard_num
    }
}