use lazy_static::lazy_static;
use prometheus::{
    core::Desc,
    core::{Collector, Opts},
    proto, Counter, IntCounter, IntGauge,
};
use std::sync::Mutex;
use tokio;
use tokio_metrics::RuntimeIntervals;
use tokio_metrics::RuntimeMetrics as RuntimeMetricsData;
use tokio_metrics::RuntimeMonitor;
const METRICS_COUNT: usize = 15;
#[derive(Debug)]
struct RuntimeMetrics {
    workers_count: IntGauge,
    total_park_count: IntCounter,
    total_noop_count: IntCounter,
    total_steal_count: IntCounter,
    total_steal_operations: IntCounter,
    num_remote_schedules: IntCounter,
    total_local_schedule_count: IntCounter,
    total_overflow_count: IntCounter,
    total_polls_count: IntCounter,
    total_busy_duration: Counter,
    injection_queue_depth: IntGauge,
    total_local_queue_depth: IntGauge,
    elapsed: Counter,
    budget_forced_yield_count: IntCounter,
    io_driver_ready_count: IntCounter,
}
impl RuntimeMetrics {
    fn new<S: Into<String>>(namespace: S) -> Self {
        let namespace = namespace.into();
        let workers_count = IntGauge::with_opts(
            Opts::new(
                "tokio_workers_count",
                r#"The number of worker threads used by the runtime."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        let total_park_count = IntCounter::with_opts(
            Opts::new(
                "tokio_total_park_count",
                r#"The number of times worker threads parked."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        let total_noop_count = IntCounter::with_opts(
            Opts::new(
                "tokio_total_noop_count",
                r#"The number of times worker threads unparked but performed no work before parking again."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        let total_steal_count = IntCounter::with_opts(
            Opts::new(
                "tokio_total_steal_count",
                r#"The number of tasks worker threads stole from another worker thread."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        let total_steal_operations = IntCounter::with_opts(
            Opts::new(
                "tokio_total_seal_operations",
                r#"The number of times worker threads stole tasks from another worker thread."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        let num_remote_schedules = IntCounter::with_opts(
            Opts::new(
                "tokio_num_remote_schedules",
                r#"The number of tasks scheduled from outside of the runtime."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        let total_local_schedule_count = IntCounter::with_opts(
            Opts::new(
                "tokio_total_local_schedule_count",
                r#"The number of tasks scheduled from worker threads."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        let total_overflow_count = IntCounter::with_opts(
            Opts::new(
                "tokio_total_overflow_count",
                r#"The number of times worker threads saturated their local queues."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        let total_polls_count = IntCounter::with_opts(
            Opts::new(
                "tokio_total_polls_count",
                r#"The number of tasks that have been polled across all worker threads."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        let total_busy_duration = Counter::with_opts(
            Opts::new(
                "tokio_total_busy_duration",
                r#"The amount of time worker threads were busy."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        let injection_queue_depth = IntGauge::with_opts(
            Opts::new(
                "tokio_injection_queue_depth",
                r#"The number of tasks currently scheduled in the runtime’s injection queue."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        let total_local_queue_depth = IntGauge::with_opts(
            Opts::new(
                "tokio_total_local_queue_depth",
                r#"The total number of tasks currently scheduled in workers’ local queues."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        let elapsed = Counter::with_opts(
            Opts::new(
                "tokio_elapsed",
                r#"Total amount of time elapsed since observing runtime metrics."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        let budget_forced_yield_count = IntCounter::with_opts(
            Opts::new(
                "tokio_budget_forced_yield_count",
                r#"Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets."#
            )
                .namespace(namespace.clone()),
        )
        .unwrap();
        let io_driver_ready_count = IntCounter::with_opts(
            Opts::new(
                "tokio_io_driver_ready_count",
                r#"Returns the number of ready events processed by the runtime’s I/O driver."#,
            )
            .namespace(namespace.clone()),
        )
        .unwrap();
        Self {
            workers_count,
            total_park_count,
            total_noop_count,
            total_steal_count,
            total_steal_operations,
            num_remote_schedules,
            total_local_schedule_count,
            total_overflow_count,
            total_polls_count,
            total_busy_duration,
            injection_queue_depth,
            total_local_queue_depth,
            elapsed,
            budget_forced_yield_count,
            io_driver_ready_count,
        }
    }
    fn update(&self, data: RuntimeMetricsData) {
        macro_rules! update_counter {
            ( $field:ident, "int" ) => {{
                let new = data.$field as u64;
                self.$field.inc_by(new);
            }};
            ( $field:ident, "duration" ) => {{
                let new = data.$field.as_secs_f64();
                self.$field.inc_by(new);
            }};
        }
        self.workers_count.set(data.workers_count as i64);
        update_counter!(total_park_count, "int");
        update_counter!(total_noop_count, "int");
        update_counter!(total_steal_count, "int");
        update_counter!(total_steal_operations, "int");
        update_counter!(num_remote_schedules, "int");
        update_counter!(total_local_schedule_count, "int");
        update_counter!(total_overflow_count, "int");
        update_counter!(total_polls_count, "int");
        update_counter!(total_busy_duration, "duration");
        self.injection_queue_depth
            .set(data.injection_queue_depth as i64);
        self.total_local_queue_depth
            .set(data.total_local_queue_depth as i64);
        update_counter!(elapsed, "duration");
        update_counter!(budget_forced_yield_count, "int");
        update_counter!(io_driver_ready_count, "int");
    }
    fn to_desc(&self) -> Vec<&Desc> {
        let mut desc = vec![];
        desc.extend(self.workers_count.desc());
        desc.extend(self.total_park_count.desc());
        desc.extend(self.total_noop_count.desc());
        desc.extend(self.total_steal_count.desc());
        desc.extend(self.total_steal_operations.desc());
        desc.extend(self.num_remote_schedules.desc());
        desc.extend(self.total_local_schedule_count.desc());
        desc.extend(self.total_overflow_count.desc());
        desc.extend(self.total_polls_count.desc());
        desc.extend(self.total_busy_duration.desc());
        desc.extend(self.injection_queue_depth.desc());
        desc.extend(self.total_local_queue_depth.desc());
        desc.extend(self.elapsed.desc());
        desc.extend(self.budget_forced_yield_count.desc());
        desc.extend(self.io_driver_ready_count.desc());
        debug_assert_eq!(desc.len(), METRICS_COUNT);
        desc
    }
    fn to_metrics(&self) -> Vec<proto::MetricFamily> {
        let mut metrics = vec![];
        metrics.extend(self.workers_count.collect());
        metrics.extend(self.total_park_count.collect());
        metrics.extend(self.total_noop_count.collect());
        metrics.extend(self.total_steal_count.collect());
        metrics.extend(self.total_steal_operations.collect());
        metrics.extend(self.num_remote_schedules.collect());
        metrics.extend(self.total_local_schedule_count.collect());
        metrics.extend(self.total_overflow_count.collect());
        metrics.extend(self.total_polls_count.collect());
        metrics.extend(self.total_busy_duration.collect());
        metrics.extend(self.injection_queue_depth.collect());
        metrics.extend(self.total_local_queue_depth.collect());
        metrics.extend(self.elapsed.collect());
        metrics.extend(self.budget_forced_yield_count.collect());
        metrics.extend(self.io_driver_ready_count.collect());
        debug_assert_eq!(metrics.len(), METRICS_COUNT);
        metrics
    }
}
pub struct RuntimeCollector {
    metrics: RuntimeMetrics,
    producer: Mutex<RuntimeIntervals>,
}
impl RuntimeCollector {
    pub fn new<S: Into<String>>(monitor: RuntimeMonitor, namespace: S) -> Self {
        let producer = Mutex::new(monitor.intervals());
        let metrics = RuntimeMetrics::new(namespace);
        Self { metrics, producer }
    }
    fn get_metrics_data(&self) -> RuntimeMetricsData {
        let data = self.producer.lock().unwrap().next().unwrap();
        data
    }
}
impl Default for RuntimeCollector {
    fn default() -> Self {
        let handle = tokio::runtime::Handle::current();
        let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
        Self::new(runtime_monitor, "")
    }
}
impl Collector for RuntimeCollector {
    fn desc(&self) -> Vec<&Desc> {
        self.metrics.to_desc()
    }
    fn collect(&self) -> Vec<proto::MetricFamily> {
        let data = self.get_metrics_data();
        self.metrics.update(data);
        self.metrics.to_metrics()
    }
}
impl Collector for &RuntimeCollector {
    fn desc(&self) -> Vec<&Desc> {
        self.metrics.to_desc()
    }
    fn collect(&self) -> Vec<proto::MetricFamily> {
        let data = self.get_metrics_data();
        self.metrics.update(data);
        self.metrics.to_metrics()
    }
}
lazy_static! {
    static ref DEFAULT_COLLECTOR: RuntimeCollector = {
        let collector = RuntimeCollector::default();
        collector
    };
}
pub fn default_collector() -> &'static RuntimeCollector {
    lazy_static::initialize(&DEFAULT_COLLECTOR);
    &DEFAULT_COLLECTOR
}
#[cfg(test)]
mod tests {
    use super::*;
    use tokio::runtime;
    #[test]
    fn test_runtime_collector_descs() {
        let rt = runtime::Builder::new_current_thread().build().unwrap();
        let mt = tokio_metrics::RuntimeMonitor::new(&rt.handle());
        let rc = RuntimeCollector::new(mt, "");
        let descs = rc.desc();
        assert_eq!(descs.len(), METRICS_COUNT);
        assert_eq!(descs[0].fq_name, "tokio_workers_count".to_string());
        assert_eq!(
            descs[0].help,
            "The number of worker threads used by the runtime."
        );
        assert_eq!(descs[0].variable_labels.len(), 0);
    }
    #[test]
    fn test_runtime_collector_metrics() {
        let rt = runtime::Builder::new_current_thread().build().unwrap();
        let mt = tokio_metrics::RuntimeMonitor::new(&rt.handle());
        let rc = RuntimeCollector::new(mt, "");
        let metrics = rc.collect();
        assert_eq!(metrics.len(), METRICS_COUNT);
        assert_eq!(metrics[0].get_name(), "tokio_workers_count");
        assert_eq!(
            metrics[0].get_help(),
            "The number of worker threads used by the runtime."
        );
        assert_eq!(metrics[0].get_metric().len(), 1);
        assert_eq!(metrics[0].get_metric()[0].get_gauge().get_value(), 1.0);
    }
    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
    async fn test_default() {
        let collector = default_collector();
        assert_eq!(collector.desc().len(), METRICS_COUNT);
        let metrics = collector.collect();
        assert_eq!(metrics.len(), METRICS_COUNT);
        assert_eq!(metrics[0].get_name(), "tokio_workers_count");
        assert_eq!(
            metrics[0].get_help(),
            "The number of worker threads used by the runtime."
        );
        assert_eq!(metrics[0].get_metric().len(), 1);
        assert_eq!(metrics[0].get_metric()[0].get_gauge().get_value(), 8.0);
    }
    #[tokio::test]
    async fn test_integrated_with_prometheus() {
        use prometheus::Encoder;
        let tc = RuntimeCollector::default();
        prometheus::default_registry()
            .register(Box::new(tc))
            .unwrap();
        let encoder = prometheus::TextEncoder::new();
        let mut buffer = Vec::new();
        encoder
            .encode(&prometheus::default_registry().gather(), &mut buffer)
            .expect("Failed to encode");
        String::from_utf8(buffer.clone()).expect("Failed to convert to string.");
    }
    #[test]
    fn test_send() {
        fn test<C: Send>() {}
        test::<DEFAULT_COLLECTOR>();
    }
    #[test]
    fn test_sync() {
        fn test<C: Sync>() {}
        test::<DEFAULT_COLLECTOR>();
    }
}