1use crate::RedisPool;
2use rand::RngCore;
3use serde::Serialize;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6
7#[derive(Clone)]
8pub struct Counter {
9 count: Arc<AtomicUsize>,
10}
11
12impl Counter {
13 #[must_use]
14 pub fn new(n: usize) -> Self {
15 Self {
16 count: Arc::new(AtomicUsize::new(n)),
17 }
18 }
19
20 #[must_use]
21 pub fn value(&self) -> usize {
22 self.count.load(Ordering::SeqCst)
23 }
24
25 pub fn decrby(&self, n: usize) {
26 self.count.fetch_sub(n, Ordering::SeqCst);
27 }
28
29 pub fn incrby(&self, n: usize) {
30 self.count.fetch_add(n, Ordering::SeqCst);
31 }
32}
33
34struct ProcessStats {
35 rtt_us: String,
36 quiet: bool,
37 busy: usize,
38 beat: chrono::DateTime<chrono::Utc>,
39 info: ProcessInfo,
40 rss: String,
41}
42
43#[derive(Serialize)]
44struct ProcessInfo {
45 hostname: String,
46 identity: String,
47 started_at: f64,
48 pid: u32,
49 tag: Option<String>,
50 concurrency: usize,
51 queues: Vec<String>,
52 labels: Vec<String>,
53}
54
55pub struct StatsPublisher {
56 hostname: String,
57 identity: String,
58 queues: Vec<String>,
59 started_at: chrono::DateTime<chrono::Utc>,
60 busy_jobs: Counter,
61 concurrency: usize,
62}
63
64fn generate_identity(hostname: &String) -> String {
65 let pid = std::process::id();
66 let mut bytes = [0u8; 12];
67 rand::thread_rng().fill_bytes(&mut bytes);
68 let nonce = hex::encode(bytes);
69
70 format!("{hostname}:{pid}:{nonce}")
71}
72
73impl StatsPublisher {
74 #[must_use]
75 pub fn new(
76 hostname: String,
77 queues: Vec<String>,
78 busy_jobs: Counter,
79 concurrency: usize,
80 ) -> Self {
81 let identity = generate_identity(&hostname);
82 let started_at = chrono::Utc::now();
83
84 Self {
85 hostname,
86 identity,
87 queues,
88 started_at,
89 busy_jobs,
90 concurrency,
91 }
92 }
93
94 pub async fn publish_stats(&self, redis: RedisPool) -> Result<(), Box<dyn std::error::Error>> {
106 let stats = self.create_process_stats().await?;
107 let mut conn = redis.get().await?;
108 let _: () = conn
109 .cmd_with_key("HSET", self.identity.clone())
110 .arg("rss")
111 .arg(stats.rss)
112 .arg("rtt_us")
113 .arg(stats.rtt_us)
114 .arg("busy")
115 .arg(stats.busy)
116 .arg("quiet")
117 .arg(stats.quiet)
118 .arg("beat")
119 .arg(stats.beat.timestamp())
120 .arg("info")
121 .arg(serde_json::to_string(&stats.info)?)
122 .query_async::<()>(conn.unnamespaced_borrow_mut())
123 .await?;
124
125 conn.expire(self.identity.clone(), 30).await?;
126
127 conn.sadd("processes".to_string(), self.identity.clone())
128 .await?;
129
130 Ok(())
131 }
132
133 async fn create_process_stats(&self) -> Result<ProcessStats, Box<dyn std::error::Error>> {
134 #[cfg(feature = "rss-stats")]
135 let rss_in_kb = format!(
136 "{}",
137 simple_process_stats::ProcessStats::get()
138 .await?
139 .memory_usage_bytes
140 / 1024
141 );
142
143 #[cfg(not(feature = "rss-stats"))]
144 let rss_in_kb = "0".to_string();
145
146 Ok(ProcessStats {
147 rtt_us: "0".into(),
148 busy: self.busy_jobs.value(),
149 quiet: false,
150 rss: rss_in_kb,
151
152 beat: chrono::Utc::now(),
153 info: ProcessInfo {
154 concurrency: self.concurrency,
155 hostname: self.hostname.clone(),
156 identity: self.identity.clone(),
157 queues: self.queues.clone(),
158 started_at: self.started_at.clone().timestamp() as f64,
159 pid: std::process::id(),
160
161 labels: vec![],
163 tag: None,
164 },
165 })
166 }
167}