sidekiq/
stats.rs

1use crate::RedisPool;
2use rand::RngCore;
3use serde::Serialize;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6
7#[derive(Clone)]
8pub struct Counter {
9    count: Arc<AtomicUsize>,
10}
11
12impl Counter {
13    #[must_use]
14    pub fn new(n: usize) -> Self {
15        Self {
16            count: Arc::new(AtomicUsize::new(n)),
17        }
18    }
19
20    #[must_use]
21    pub fn value(&self) -> usize {
22        self.count.load(Ordering::SeqCst)
23    }
24
25    pub fn decrby(&self, n: usize) {
26        self.count.fetch_sub(n, Ordering::SeqCst);
27    }
28
29    pub fn incrby(&self, n: usize) {
30        self.count.fetch_add(n, Ordering::SeqCst);
31    }
32}
33
34struct ProcessStats {
35    rtt_us: String,
36    quiet: bool,
37    busy: usize,
38    beat: chrono::DateTime<chrono::Utc>,
39    info: ProcessInfo,
40    rss: String,
41}
42
43#[derive(Serialize)]
44struct ProcessInfo {
45    hostname: String,
46    identity: String,
47    started_at: f64,
48    pid: u32,
49    tag: Option<String>,
50    concurrency: usize,
51    queues: Vec<String>,
52    labels: Vec<String>,
53}
54
55pub struct StatsPublisher {
56    hostname: String,
57    identity: String,
58    queues: Vec<String>,
59    started_at: chrono::DateTime<chrono::Utc>,
60    busy_jobs: Counter,
61    concurrency: usize,
62}
63
64fn generate_identity(hostname: &String) -> String {
65    let pid = std::process::id();
66    let mut bytes = [0u8; 12];
67    rand::thread_rng().fill_bytes(&mut bytes);
68    let nonce = hex::encode(bytes);
69
70    format!("{hostname}:{pid}:{nonce}")
71}
72
73impl StatsPublisher {
74    #[must_use]
75    pub fn new(
76        hostname: String,
77        queues: Vec<String>,
78        busy_jobs: Counter,
79        concurrency: usize,
80    ) -> Self {
81        let identity = generate_identity(&hostname);
82        let started_at = chrono::Utc::now();
83
84        Self {
85            hostname,
86            identity,
87            queues,
88            started_at,
89            busy_jobs,
90            concurrency,
91        }
92    }
93
94    // 127.0.0.1:6379> hkeys "yolo_app:DESKTOP-UMSV21A:107068:5075431aeb06"
95    // 1) "rtt_us"
96    // 2) "quiet"
97    // 3) "busy"
98    // 4) "beat"
99    // 5) "info"
100    // 6) "rss"
101    // 127.0.0.1:6379> hget "yolo_app:DESKTOP-UMSV21A:107068:5075431aeb06" info
102    // "{\"hostname\":\"DESKTOP-UMSV21A\",\"started_at\":1658082501.5606177,\"pid\":107068,\"tag\":\"\",\"concurrency\":10,\"queues\":[\"ruby:v1_statistics\",\"ruby:v2_statistics\"],\"labels\":[],\"identity\":\"DESKTOP-UMSV21A:107068:5075431aeb06\"}"
103    // 127.0.0.1:6379> hget "yolo_app:DESKTOP-UMSV21A:107068:5075431aeb06" irss
104    // (nil)
105    pub async fn publish_stats(&self, redis: RedisPool) -> Result<(), Box<dyn std::error::Error>> {
106        let stats = self.create_process_stats().await?;
107        let mut conn = redis.get().await?;
108        let _: () = conn
109            .cmd_with_key("HSET", self.identity.clone())
110            .arg("rss")
111            .arg(stats.rss)
112            .arg("rtt_us")
113            .arg(stats.rtt_us)
114            .arg("busy")
115            .arg(stats.busy)
116            .arg("quiet")
117            .arg(stats.quiet)
118            .arg("beat")
119            .arg(stats.beat.timestamp())
120            .arg("info")
121            .arg(serde_json::to_string(&stats.info)?)
122            .query_async::<()>(conn.unnamespaced_borrow_mut())
123            .await?;
124
125        conn.expire(self.identity.clone(), 30).await?;
126
127        conn.sadd("processes".to_string(), self.identity.clone())
128            .await?;
129
130        Ok(())
131    }
132
133    async fn create_process_stats(&self) -> Result<ProcessStats, Box<dyn std::error::Error>> {
134        #[cfg(feature = "rss-stats")]
135        let rss_in_kb = format!(
136            "{}",
137            simple_process_stats::ProcessStats::get()
138                .await?
139                .memory_usage_bytes
140                / 1024
141        );
142
143        #[cfg(not(feature = "rss-stats"))]
144        let rss_in_kb = "0".to_string();
145
146        Ok(ProcessStats {
147            rtt_us: "0".into(),
148            busy: self.busy_jobs.value(),
149            quiet: false,
150            rss: rss_in_kb,
151
152            beat: chrono::Utc::now(),
153            info: ProcessInfo {
154                concurrency: self.concurrency,
155                hostname: self.hostname.clone(),
156                identity: self.identity.clone(),
157                queues: self.queues.clone(),
158                started_at: self.started_at.clone().timestamp() as f64,
159                pid: std::process::id(),
160
161                // TODO: Fill out labels and tags.
162                labels: vec![],
163                tag: None,
164            },
165        })
166    }
167}