dwd/stat/
percpu.rs

1use core::{
2    cell::UnsafeCell,
3    sync::atomic::{AtomicU64, Ordering},
4};
5use std::{sync::Arc, time::Instant};
6
7use super::{CommonStat, HttpStat, RxStat, SocketStat, TxStat};
8use crate::histogram::{LogHistogram, PerCpuLogHistogram};
9
10#[derive(Debug)]
11pub struct Stat<T, R, S, H> {
12    generator: AtomicU64,
13    pub stats: Vec<Arc<PerCpuStat<T, R, S, H>>>,
14}
15
16impl<T, R, S, H> Stat<T, R, S, H>
17where
18    T: Default,
19    R: Default,
20    S: Default,
21    H: Default,
22{
23    pub fn new(stats: Vec<Arc<PerCpuStat<T, R, S, H>>>) -> Self {
24        Self { generator: AtomicU64::new(0), stats }
25    }
26}
27
28impl<T, R, S, H> CommonStat for Stat<T, R, S, H> {
29    #[inline]
30    fn generator(&self) -> u64 {
31        self.generator.load(Ordering::Relaxed)
32    }
33
34    #[inline]
35    fn on_generator(&self, v: u64) {
36        self.generator.store(v, Ordering::Relaxed);
37    }
38}
39
40impl<R, S, H> TxStat for Stat<TxWorkerStat, R, S, H> {
41    #[inline]
42    fn num_requests(&self) -> u64 {
43        self.stats.iter().map(|v| unsafe { *v.tx.num_requests.get() }).sum()
44    }
45
46    #[inline]
47    fn bytes_tx(&self) -> u64 {
48        self.stats.iter().map(|v| unsafe { *v.tx.bytes_tx.get() }).sum()
49    }
50}
51
52impl<T, S, H> RxStat for Stat<T, RxWorkerStat, S, H> {
53    #[inline]
54    fn num_responses(&self) -> u64 {
55        self.stats.iter().map(|v| unsafe { *v.rx.num_responses.get() }).sum()
56    }
57
58    #[inline]
59    fn num_timeouts(&self) -> u64 {
60        self.stats.iter().map(|v| unsafe { *v.rx.num_timeouts.get() }).sum()
61    }
62
63    #[inline]
64    fn bytes_rx(&self) -> u64 {
65        self.stats.iter().map(|v| unsafe { *v.rx.bytes_rx.get() }).sum()
66    }
67
68    #[inline]
69    fn hist(&self) -> LogHistogram {
70        let mut snapshot = vec![0u64; self.stats.len()];
71        for s in &self.stats {
72            for (idx, b) in s.rx.hist.buckets().iter().enumerate() {
73                snapshot[idx] += unsafe { *b.get() };
74            }
75        }
76
77        LogHistogram::new(snapshot)
78    }
79}
80
81impl<T, R, H> SocketStat for Stat<T, R, SockWorkerStat, H> {
82    #[inline]
83    fn num_sock_created(&self) -> u64 {
84        self.stats
85            .iter()
86            .map(|v| unsafe { *v.sock.num_sock_created.get() })
87            .sum()
88    }
89
90    #[inline]
91    fn num_sock_errors(&self) -> u64 {
92        self.stats
93            .iter()
94            .map(|v| unsafe { *v.sock.num_sock_errors.get() })
95            .sum()
96    }
97}
98
99impl<T, R, S> HttpStat for Stat<T, R, S, HttpWorkerStat> {
100    #[inline]
101    fn num_2xx(&self) -> u64 {
102        self.stats.iter().map(|v| unsafe { *v.http.num_2xx.get() }).sum()
103    }
104
105    #[inline]
106    fn num_3xx(&self) -> u64 {
107        self.stats.iter().map(|v| unsafe { *v.http.num_3xx.get() }).sum()
108    }
109
110    #[inline]
111    fn num_4xx(&self) -> u64 {
112        self.stats.iter().map(|v| unsafe { *v.http.num_4xx.get() }).sum()
113    }
114
115    #[inline]
116    fn num_5xx(&self) -> u64 {
117        self.stats.iter().map(|v| unsafe { *v.http.num_5xx.get() }).sum()
118    }
119}
120
121#[derive(Debug, Default)]
122pub struct PerCpuStat<T = (), R = (), S = (), H = ()> {
123    tx: T,
124    rx: R,
125    sock: S,
126    http: H,
127}
128
129impl<R, S, H> PerCpuStat<TxWorkerStat, R, S, H> {
130    /// Increases the number of requests made by the given value.
131    ///
132    /// Should be called after each successful request transmitted.
133    #[inline]
134    pub fn on_requests(&self, v: u64) {
135        unsafe { *self.tx.num_requests.get() += v };
136    }
137
138    #[inline]
139    pub fn on_send(&self, n: u64) {
140        unsafe { *self.tx.bytes_tx.get() += n };
141    }
142}
143
144impl<T, S, H> PerCpuStat<T, RxWorkerStat, S, H> {
145    /// Increases the number of responses.
146    ///
147    /// Should be called after each successful response received.
148    #[inline]
149    pub fn on_response(&self, now: &Instant) {
150        unsafe { *self.rx.num_responses.get() += 1 };
151        self.rx.hist.record(now.elapsed().as_micros() as u64);
152    }
153
154    #[inline]
155    pub fn on_recv(&self, n: u64) {
156        unsafe { *self.rx.bytes_rx.get() += n };
157    }
158
159    #[inline]
160    pub fn on_timeout(&self, now: &Instant) {
161        unsafe { *self.rx.num_timeouts.get() += 1 };
162        self.rx.hist.record(now.elapsed().as_micros() as u64);
163    }
164}
165
166impl<T, R, H> PerCpuStat<T, R, SockWorkerStat, H> {
167    /// Increases the number of sockets created.
168    #[inline]
169    pub fn on_sock_created(&self) {
170        unsafe { *self.sock.num_sock_created.get() += 1 };
171    }
172
173    /// Increases the number of socket errors.
174    #[inline]
175    pub fn on_sock_err(&self) {
176        unsafe { *self.sock.num_sock_errors.get() += 1 };
177    }
178}
179
180impl<T, R, S> PerCpuStat<T, R, S, HttpWorkerStat> {
181    /// Increases the number of 2xx responses by the given value.
182    #[inline]
183    pub fn on_2xx(&self) {
184        unsafe { *self.http.num_2xx.get() += 1 };
185    }
186
187    /// Increases the number of 3xx responses by the given value.
188    #[inline]
189    pub fn on_3xx(&self) {
190        unsafe { *self.http.num_3xx.get() += 1 };
191    }
192
193    /// Increases the number of 4xx responses by the given value.
194    #[inline]
195    pub fn on_4xx(&self) {
196        unsafe { *self.http.num_4xx.get() += 1 };
197    }
198
199    /// Increases the number of 5xx responses by the given value.
200    #[inline]
201    pub fn on_5xx(&self) {
202        unsafe { *self.http.num_5xx.get() += 1 };
203    }
204}
205
206unsafe impl<T, R, S, H> Sync for PerCpuStat<T, R, S, H>
207where
208    T: Sync,
209    R: Sync,
210    S: Sync,
211    H: Sync,
212{
213}
214
215/// Per-worker transmission statistics.
216#[derive(Debug, Default)]
217pub struct TxWorkerStat {
218    /// Number of requests made.
219    num_requests: UnsafeCell<u64>,
220    /// Number of bytes transmitted.
221    bytes_tx: UnsafeCell<u64>,
222}
223
224unsafe impl Sync for TxWorkerStat {}
225
226/// Per-worker reception statistics.
227#[derive(Debug, Default)]
228pub struct RxWorkerStat {
229    /// Number of responses received.
230    num_responses: UnsafeCell<u64>,
231    /// Number of bytes received.
232    bytes_rx: UnsafeCell<u64>,
233    /// Number of timeouts.
234    num_timeouts: UnsafeCell<u64>,
235    /// Response times histogram.
236    hist: PerCpuLogHistogram,
237}
238
239unsafe impl Sync for RxWorkerStat {}
240
241/// Per-worker socket statistics.
242#[derive(Debug, Default)]
243pub struct SockWorkerStat {
244    /// Number of sockets created.
245    num_sock_created: UnsafeCell<u64>,
246    /// Number of socket errors.
247    num_sock_errors: UnsafeCell<u64>,
248}
249
250unsafe impl Sync for SockWorkerStat {}
251
252/// Per-worker HTTP statistics.
253#[derive(Debug, Default)]
254pub struct HttpWorkerStat {
255    /// Number of 2xx responses.
256    num_2xx: UnsafeCell<u64>,
257    /// Number of 3xx responses.
258    num_3xx: UnsafeCell<u64>,
259    /// Number of 4xx responses.
260    num_4xx: UnsafeCell<u64>,
261    /// Number of 5xx responses.
262    num_5xx: UnsafeCell<u64>,
263}
264
265unsafe impl Sync for HttpWorkerStat {}