sentinel-core 0.1.3

The flow sentinel of your microservices.
Documentation
use super::{BucketLeapArray, SlidingWindowMetric};
use crate::{
    base::{
        ConcurrencyStat, MetricEvent, MetricItem, MetricItemRetriever, ReadStat, ResourceType,
        StatNode, TimePredicate, WriteStat,
    },
    config, Result,
};
use std::sync::{
    atomic::{AtomicU32, Ordering},
    Arc,
};

#[allow(dead_code)]
#[derive(Debug)]
pub struct ResourceNode {
    pub(crate) res_name: String,
    pub(crate) resource_type: ResourceType,
    pub(crate) sample_count: u32,
    pub(crate) interval_ms: u32,
    pub(crate) concurrency: AtomicU32,
    pub(crate) arr: Arc<BucketLeapArray>,
    pub(crate) metric: Arc<SlidingWindowMetric>,
}

impl ResourceNode {
    pub fn new(res_name: String, resource_type: ResourceType) -> Self {
        let arr = Arc::new(
            BucketLeapArray::new(
                config::global_stat_sample_count_total(),
                config::global_stat_interval_ms_total(),
            )
            .unwrap(),
        );
        let sample_count = config::metric_stat_sample_count();
        let interval_ms = config::metric_stat_interval_ms();
        let metric =
            Arc::new(SlidingWindowMetric::new(sample_count, interval_ms, arr.clone()).unwrap());
        ResourceNode {
            res_name,
            resource_type,
            sample_count,
            interval_ms,
            concurrency: AtomicU32::new(0),
            arr,
            metric,
        }
    }

    pub fn default_metric(&self) -> Arc<dyn ReadStat> {
        self.metric.clone()
    }

    pub fn max_avg(&self, event: MetricEvent) -> f64 {
        self.metric.max_of_single_bucket(event) as f64 * self.sample_count as f64
            / self.interval_ms as f64
            * 1000f64
    }

    pub fn max_concurrency(&self) -> u32 {
        self.metric.max_concurrency()
    }
}

impl MetricItemRetriever for ResourceNode {
    fn metrics_on_condition(&self, predicate: &TimePredicate) -> Vec<MetricItem> {
        self.metric.second_metrics_on_condition(predicate)
    }
}

impl ReadStat for ResourceNode {
    fn qps(&self, event: MetricEvent) -> f64 {
        self.metric.qps(event)
    }
    fn qps_previous(&self, event: MetricEvent) -> f64 {
        self.metric.qps_previous(event)
    }
    fn sum(&self, event: MetricEvent) -> u64 {
        self.metric.sum(event)
    }
    fn min_rt(&self) -> f64 {
        self.metric.min_rt()
    }
    fn avg_rt(&self) -> f64 {
        self.metric.avg_rt()
    }
}

impl WriteStat for ResourceNode {
    fn add_count(&self, event: MetricEvent, count: u64) {
        self.arr.add_count(event, count);
    }

    fn update_concurrency(&self, concurrency: u32) {
        self.arr.update_concurrency(concurrency);
    }
}

impl ConcurrencyStat for ResourceNode {
    fn current_concurrency(&self) -> u32 {
        self.concurrency.load(Ordering::SeqCst)
    }

    fn increase_concurrency(&self) {
        self.arr
            .update_concurrency(self.concurrency.fetch_add(1, Ordering::SeqCst) + 1)
    }

    fn decrease_concurrency(&self) {
        self.concurrency.fetch_sub(1, Ordering::SeqCst);
    }
}

impl StatNode for ResourceNode {
    fn generate_read_stat(&self, sample_count: u32, interval_ms: u32) -> Result<Arc<dyn ReadStat>> {
        let stat = SlidingWindowMetric::new(sample_count, interval_ms, self.arr.clone())?;
        Ok(Arc::new(stat))
    }
}