use crate::RedisPool;
use rand::RngCore;
use serde::Serialize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Clone)]
pub struct Counter {
    count: Arc<AtomicUsize>,
}
impl Counter {
    #[must_use]
    pub fn new(n: usize) -> Self {
        Self {
            count: Arc::new(AtomicUsize::new(n)),
        }
    }
    #[must_use]
    pub fn value(&self) -> usize {
        self.count.load(Ordering::SeqCst)
    }
    pub fn decrby(&self, n: usize) {
        self.count.fetch_sub(n, Ordering::SeqCst);
    }
    pub fn incrby(&self, n: usize) {
        self.count.fetch_add(n, Ordering::SeqCst);
    }
}
struct ProcessStats {
    rtt_us: String,
    quiet: bool,
    busy: usize,
    beat: chrono::DateTime<chrono::Utc>,
    info: ProcessInfo,
    rss: String,
}
#[derive(Serialize)]
struct ProcessInfo {
    hostname: String,
    identity: String,
    started_at: f64,
    pid: u32,
    tag: Option<String>,
    concurrency: usize,
    queues: Vec<String>,
    labels: Vec<String>,
}
pub struct StatsPublisher {
    hostname: String,
    identity: String,
    queues: Vec<String>,
    started_at: chrono::DateTime<chrono::Utc>,
    busy_jobs: Counter,
}
fn generate_identity(hostname: &String) -> String {
    let pid = std::process::id();
    let mut bytes = [0u8; 12];
    rand::thread_rng().fill_bytes(&mut bytes);
    let nonce = hex::encode(bytes);
    format!("{hostname}:{pid}:{nonce}")
}
impl StatsPublisher {
    #[must_use]
    pub fn new(hostname: String, queues: Vec<String>, busy_jobs: Counter) -> Self {
        let identity = generate_identity(&hostname);
        let started_at = chrono::Utc::now();
        Self {
            hostname,
            identity,
            queues,
            started_at,
            busy_jobs,
        }
    }
    pub async fn publish_stats(&self, redis: RedisPool) -> Result<(), Box<dyn std::error::Error>> {
        let stats = self.create_process_stats().await?;
        let mut conn = redis.get().await?;
        conn.cmd_with_key("HSET", self.identity.clone())
            .arg("rss")
            .arg(stats.rss)
            .arg("rtt_us")
            .arg(stats.rtt_us)
            .arg("busy")
            .arg(stats.busy)
            .arg("quiet")
            .arg(stats.quiet)
            .arg("beat")
            .arg(stats.beat.timestamp())
            .arg("info")
            .arg(serde_json::to_string(&stats.info)?)
            .query_async(conn.unnamespaced_borrow_mut())
            .await?;
        conn.expire(self.identity.clone(), 30).await?;
        conn.sadd("processes".to_string(), self.identity.clone())
            .await?;
        Ok(())
    }
    async fn create_process_stats(&self) -> Result<ProcessStats, Box<dyn std::error::Error>> {
        let rss_in_kb = format!(
            "{}",
            simple_process_stats::ProcessStats::get()
                .await?
                .memory_usage_bytes
                / 1024
        );
        Ok(ProcessStats {
            rtt_us: "0".into(),
            busy: self.busy_jobs.value(),
            quiet: false,
            rss: rss_in_kb,
            beat: chrono::Utc::now(),
            info: ProcessInfo {
                concurrency: num_cpus::get(),
                hostname: self.hostname.clone(),
                identity: self.identity.clone(),
                queues: self.queues.clone(),
                started_at: self.started_at.clone().timestamp() as f64,
                pid: std::process::id(),
                labels: vec![],
                tag: None,
            },
        })
    }
}